Files
DP44/DataPRO/SLICECommands/RealtimeCommands/StreamReaderUDP.cs

200 lines
7.8 KiB
C#
Raw Normal View History

2026-04-17 14:55:32 -04:00
using DTS.Common.Utilities.Logging;
using System;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;
using DTS.Common.Enums;
namespace DTS.DASLib.Command.SLICE.RealtimeCommands
{
/// <summary>
/// this class was ported from FWTU
/// </summary>
public class StreamReaderUDP
{
public string StreamAddress { get; }
/// <summary>
/// appears to be the parameters (line) sent to the start command
/// </summary>
public byte[] cmdline { get; set; }
private Socket _udpSocket { get; set; }
/// <summary>
/// IP of our receive endpoint. Either 0.0.0.0 or a specified adapter's IP
/// FB15531: Follow pattern for multicast AutoDiscovery in StreamReader
/// </summary>
public string HostIPAddress { get; set; } = IPAddress.Any.ToString();
public void CloseSocket()
{
try
{
_udpSocket.Close();
}
catch (Exception e)
{
APILogger.LogException(e);
}
}
public UDPStreamProfile UDPStreamType
{
get;
set;
}
private EndPoint _uDPEndpoint;
public EndPoint UDPEndpoint
{
get => _uDPEndpoint;
set => _uDPEndpoint = value;
}
public ulong UDPSampleNumber { get; set; }
public StreamReaderUDP(string streamAddress, string hostAddress, UDPStreamProfile uDPStreamType, byte[] channels)
{
StreamAddress = streamAddress.TrimEnd('/');
UDPSampleNumber = 0;
UDPStreamType = uDPStreamType;
Channels = channels;
HostIPAddress = hostAddress;
Configure();
}
public byte[] Channels { get; set; } = new byte[0];
private void Configure()
{
// setup parameter for command
var channelMaskAndReserved = 0; // default to all channels.
//I'm not sure this is supported yet...
//if (Channels.Any())
//{
// foreach (var ch in Channels)
// {
// channelMaskAndReserved |= 1 << ch;
// }
//}
var channelList = BitConverter.GetBytes(channelMaskAndReserved);
// create parameter for streaming command.
cmdline = new byte[4 + StreamAddress.Length];
var paramNetAddr = Encoding.ASCII.GetBytes(StreamAddress);
Buffer.BlockCopy(channelList, 0, cmdline, 0, channelList.Length);
Buffer.BlockCopy(paramNetAddr, 0, cmdline, channelList.Length,
paramNetAddr
.Length); // System.Buffer.BlockCopy(netAddr.ToArray(), 0, cmdline, channelList.Length, netAddr.Length);
// get IP and port udp://239.1.2.10:portID
var parts = StreamAddress.Split(':');
if (parts.Length != 3)
{
throw new Exception($"Invalid UDP address:{StreamAddress}");
}
// remove '//' or '/' from IP and port
var udphost = parts[1].Trim('/');
var udpport = parts[2].Trim('/');
_udpSocket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);
var iep = IPAddress.TryParse(HostIPAddress, out var address) ? new IPEndPoint(address, Convert.ToUInt16(udpport)) : new IPEndPoint(IPAddress.Any, Convert.ToUInt16(udpport));
UDPEndpoint = iep;
try
{
_udpSocket.Bind(iep);
}
catch (Exception e)
{
APILogger.LogException(e);
_udpSocket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, 2000);
_udpSocket.Bind(iep); // retry again.
}
// check for udp broadcast
var ipChecks = udphost.Split('.');
var ipV4Check = Convert.ToInt32(ipChecks[0]);
if ((ipV4Check >= 224) & (ipV4Check <= 239))
{
var mcastOption = new MulticastOption(IPAddress.Parse(udphost));
if (iep.Address != IPAddress.Any)
{
mcastOption = new MulticastOption(IPAddress.Parse(udphost), iep.Address);
}
_udpSocket.SetSocketOption(SocketOptionLevel.IP, SocketOptionName.AddMembership, mcastOption); // "239.1.2.10")));
}
//set timer for recv_socket
_udpSocket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReceiveTimeout, 2000);
}
/// <summary>
/// receives any packets waiting
/// returns null if no information is ready, otherwise a structure filled with information
/// </summary>
/// <returns></returns>
public UDPStreamPacket Read()
{
//receive data
var databuf = new byte[2000];
var recv = _udpSocket.ReceiveFrom(databuf, ref _uDPEndpoint);
if (recv <= 0)
{
return null;
}
var data = new byte[recv];
Buffer.BlockCopy(databuf, 0, data, 0, recv);
var udpData = new UDPRealtimeByteConverter(data);
var udpStreamPacket = new UDPStreamPacket();
udpStreamPacket.ChannelData = new short[Channels.Length][];
//fun fact, sending all channels right now, so regardless of how many were configured to send, use this
var numChannels = 6;
//calculate how many samples are in the packet, but use the floor incase it's not complete
var numSamples = Convert.ToUInt64(Math.Floor((double)udpData.RtData.Length / numChannels));
for (var i = 0; i < Channels.Length; i++)
{
udpStreamPacket.ChannelData[i] = new short[numSamples];
}
//forward declarations to reduce a little churn
byte channel = 0;
var list = Channels.ToList();
var sampleIndex = 0UL;
short adc = 0;
for (var i = 0; i < udpData.RtData.Length; i++)
{
//which channel this sample is for
channel = Convert.ToByte(i % numChannels);
//which channel in the list this is for
var channelIdx = list.IndexOf(channel);
//channel is not in list, don't care about this sample
if (channelIdx < 0) { continue; }
//which sample in a sequence of multiple samples this is
sampleIndex = Convert.ToUInt64(Math.Floor((double)i / numChannels));
//we should have all complete packets, but if there's an incomplete round
//this will skip it
if (sampleIndex >= numSamples) { continue; }
//add the sample into the list of samples
adc = (short)udpData.RtData[i];
udpStreamPacket.ChannelData[channelIdx][sampleIndex] = adc;
}
udpStreamPacket.PTPTimesec = Convert.ToUInt32(udpData.PtpTimeStampSec);
udpStreamPacket.PTPTimeNsec = Convert.ToUInt32(udpData.PtpTimeStampNsec);
udpStreamPacket.PTPSyncStatusError = Convert.ToBoolean(udpData.PacketFlags & 0x20);
udpStreamPacket.ADCOverflowStatus = Convert.ToBoolean(udpData.PacketFlags & 0x10);
udpStreamPacket.TimeStamp = 0L;
//packet does not contain a sample number, so we calculate it
udpStreamPacket.SampleNumber = udpData.SequenceNumber * numSamples;
return udpStreamPacket;
}
}
}