using DTS.Common.Utilities.Logging; using DTS.Common.Utils; using DTS.DASLib.Command.SLICE.MulticastCommands; using DTS.Serialization.IRIGCH10.Packets; using System; using System.Collections.Generic; using System.Linq; using System.Net; using System.Net.Sockets; using System.Threading; using System.Threading.Tasks; namespace DTS.Serialization.IRIGCH10 { public class CH10AnalogStreamDecode { private static object MyLock = new object(); private readonly ManualResetEvent _stopListening = new ManualResetEvent(false); public string MulticastReceiveAddress { get; set; } = MulticastCommandBase.DEFAULT_RECEIVE_ADDRESS; public int ResponsePort { get; set; } = (int)MulticastCommandBase.Ports.Response; private Task _listeningTask = null; public IPAddress BindToAdapterIPAddress { get; set; } = IPAddress.Any; /// /// starts listening for UDP stream packets /// public void StartListening() { lock (MyLock) { _stopListening.Set(); if (null != _listeningTask) { _listeningTask.Wait(); } _stopListening.Reset(); _listeningTask = Task.Run(ListenThread); } } public delegate void TimePacketDelegate(TimePacketFormat2 packet); public delegate void AnalogDataPacketDelegate(AnalogDataFormat1Packet packet); public delegate void TMATSPacketDelegate(TMATSPacket packet); public delegate void BadCRCDelegate(IPacketHeader packet); /// /// action to perform when a packet is received but has a bad CRC /// public BadCRCDelegate OnBadCRC; /// /// action to perform when a time packet is received /// public TimePacketDelegate OnTimePacket; /// /// action to perform when an analog data packet is received /// public AnalogDataPacketDelegate OnAnalogPacket; /// /// action to perform when a tmats packet is received /// public TMATSPacketDelegate OnTMATSPacket; private void ListenThread() { var rxGroupAddress = IPAddress.Parse(MulticastReceiveAddress); var endPoint = new IPEndPoint(BindToAdapterIPAddress, ResponsePort); var receiver = new UdpClient(); receiver.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true); if (BindToAdapterIPAddress == IPAddress.Any) { receiver.ExclusiveAddressUse = false; } try { receiver.Client.Bind(endPoint); } catch (Exception ex) { APILogger.Log(ex); return; } try { if (BindToAdapterIPAddress == IPAddress.Any) { receiver.JoinMulticastGroup(rxGroupAddress); } else { receiver.JoinMulticastGroup(rxGroupAddress, BindToAdapterIPAddress); } } catch (Exception ex) { APILogger.Log(ex); } var data = new byte[0]; IAsyncResult asyncResult = null; do { if (BindToAdapterIPAddress != IPAddress.Any && !NetworkUtils.IsNetworkInterfaceUp(BindToAdapterIPAddress)) { Thread.Sleep(100); continue; } asyncResult = receiver.BeginReceive(null, null); asyncResult.AsyncWaitHandle.WaitOne(100); if (asyncResult.IsCompleted) { try { IPEndPoint remoteEP = null; data = receiver.EndReceive(asyncResult, ref remoteEP); data = CombineBytesIfNeeded(data); ParseBytes(data); } catch (Exception ex) { APILogger.Log(ex); } } } while (!_stopListening.WaitOne(0, false)); receiver.Close(); } private byte[] _leftOvertBytes = new byte[0]; private static object LEFT_OVER_BYTES_LOCK = new object(); private void QueueBytes(byte[] bytes) { lock (LEFT_OVER_BYTES_LOCK) { var newBytes = new byte[bytes.Length + _leftOvertBytes.Length]; Buffer.BlockCopy(_leftOvertBytes, 0, newBytes, 0, _leftOvertBytes.Length); Buffer.BlockCopy(bytes, 0, newBytes, _leftOvertBytes.Length, bytes.Length); _leftOvertBytes = bytes; } } private byte[] CombineBytesIfNeeded(byte[] newData) { lock (LEFT_OVER_BYTES_LOCK) { var newBytes = new byte[newData.Length + _leftOvertBytes.Length]; Buffer.BlockCopy(_leftOvertBytes, 0, newBytes, 0, _leftOvertBytes.Length); Buffer.BlockCopy(newData, 0, newBytes, _leftOvertBytes.Length, newData.Length); return newBytes; } } private void ParseBytes(byte[] bytes) { var currentIndex = 0L; while (currentIndex < bytes.Length) { var nextIndex = Chapter10File.ReadChapter10PacketHeader(bytes, currentIndex, out var header); if (header.PacketSyncPattern != PacketHeader.EXPECTED_SYNC_PATTERN) { currentIndex = Chapter10File.GetIndexOfNextPacket(bytes, currentIndex); } else { if (header.CheckSum != header.ComputeCheckSum()) { OnBadCRC?.Invoke(header); continue; } if (header.PacketLength > bytes.Length) { QueueBytes(bytes); return; } var buffer = new byte[header.PacketLength]; Buffer.BlockCopy(bytes, Convert.ToInt32(currentIndex), buffer, 0, Convert.ToInt32(header.PacketLength)); switch (header.DataFileType) { case Enums.DataFileDataTypes.ComputerGeneratedDataFormat0: case Enums.DataFileDataTypes.ComputerGeneratedDataFormat1: APILogger.Log("TMATS packet received"); var tmatsPacket = new TMATSPacket(buffer); OnTMATSPacket?.Invoke(tmatsPacket); break; case Enums.DataFileDataTypes.AnalogDataFormat1: var analog = new AnalogDataFormat1Packet(buffer); OnAnalogPacket?.Invoke(analog); break; case Enums.DataFileDataTypes.TimeDataFormat2: var timePacket = new TimePacketFormat2(buffer); OnTimePacket?.Invoke(timePacket); break; default: APILogger.Log($"unknown header file type: {header.DataFileType}, "); break; } currentIndex = nextIndex; } } } /// /// stops listening for stream packets /// public void StopListening() { lock (MyLock) { _stopListening.Set(); } } } }