using System; using System.Linq; using DTS.Common.Constant; using DTS.Common.Enums.Communication; using DTS.Common.Enums.DASFactory; using DTS.Common.ICommunication; using DTS.Common.Utilities; using DTS.Common.Utilities.Logging; namespace DTS.DASLib.Command.SLICE.RealtimeCommands { /// /// this "command" gets the next set of samples /// note that in streaming mode no actual command is sent, we just pick up whatever data is in the RECV buffer /// public class RealtimeStreamingNextSamples : CommandBase, IGetRealtimeSamples { public RealtimeStreamingNextSamples(DTS.Common.Interface.DASFactory.ICommunication sock) : base(sock) { baseCommand = new CommandPacket(); } public RealtimeStreamingNextSamples(DTS.Common.Interface.DASFactory.ICommunication sock, int msTimeout) : base(sock, msTimeout) { baseCommand = new CommandPacket(); } public bool DigitalInput { get; set; } public bool[] TransitionMode { get; set; } /// /// the realtime data sample data /// public RealtimeStreamDecoder RtData { get; private set; } /// /// these are both used in garbage packet detection /// sample numbers are always increasing, sequence numbers are increasing but wrap around at ushort.max /// we could just use only sample number and not sample number and sequence /// for the garbage packet detection, but the number of samples per packet can be variable /// (especially with different SPS and number of channels) while the sequence is always +1 increase /// private ulong _lastProcessedSampleNumber = ulong.MaxValue; private ushort _lastSequenceNumber = 0; /// /// this is a fairly arbitrary choice, this is just used for garbage packet detection /// in practical use we are unlikely to drop more than 50 sequence at 2k sps /// so if we have more than a 1k difference there's a good chance the packet is garbage /// private const int MAX_SEQUENCE_DELTA = 1000; public bool SignedData { get; set; } = false; public void ProcessData() { try { #if REALTIME_LOGGING var dt = DateTime.Now; System.Diagnostics.Trace.WriteLine($"{dt.Hour}:{dt.Minute}:{dt.Second}.{dt.Millisecond} ProcessData"); #endif RtData = null; System.Threading.Thread.Sleep(20); var bytes = baseResponse.ToBytes(); var headerCRC = response.HeaderCRC; response.ComputeCRCs(); if (response.HeaderCRC != headerCRC) { //garbage packet, don't process return; } if (!bytes.Any()) return; RtData = new RealtimeStreamDecoder(bytes); var bFailsSequenceCheck = false; var sequenceDelta = Math.Abs(RtData.SequenceNumber - _lastSequenceNumber); if (RtData.SequenceNumber != 0 && sequenceDelta > MAX_SEQUENCE_DELTA) { #if REALTIME_LOGGING System.Diagnostics.Trace.WriteLine("garbage packet"); #endif //crc matched, but we've still got a garbage packet, as determined by looking at the sequence # bFailsSequenceCheck = true; } _lastSequenceNumber = RtData.SequenceNumber; if (null == RtData || RtData.SampleNumber == _lastProcessedSampleNumber || bFailsSequenceCheck) { #if REALTIME_LOGGING if (RtData.SampleNumber == _lastProcessedSampleNumber) { System.Diagnostics.Trace.WriteLine("Same packet #"); } #endif RtData = null; return; } #if REALTIME_LOGGING System.Diagnostics.Trace.WriteLine($"Processed: {RtData.SampleNumber}"); #endif _lastProcessedSampleNumber = RtData.SampleNumber; ChannelData = new short[Channels][]; for (var idx = 0; idx < Channels; idx++) { ChannelData[idx] = new short[SamplesReturned]; } int insertPt; short adc; for (var i = 0; i < RtData.RtData.Length; i++) { //data is in the form of ABC where A is channel 0, and C is channel 2, so we can //figure out the channel idx using modulo var channelIdx = RtData.Channels[i % RtData.Channels.Length]; //per other realtime functions, we transform the ushort to a short if (SignedData) { adc = (short)RtData.RtData[i]; } else { adc = (short)(RtData.RtData[i] - 0x8000); } //data is in the form of ABC, so what sample we are on from the first sample can be computed //by dividing by the number of channels in the response insertPt = Convert.ToInt32(Math.Floor((double)i / RtData.Channels.Length)); ChannelData[channelIdx][insertPt] = adc; } } catch (Exception) { #if REALTIME_LOGGING System.Diagnostics.Trace.WriteLine(ex.Message); #endif RtData = null; } } /// /// /// the sample number of the first sample in the data stream /// public ulong SampleNumber => RtData.SampleNumber; public ulong TimeStamp => RtData.TimeStamp; public ulong SequenceNumber => RtData.SequenceNumber; /// /// All channel data for the DAS, note that if a channel is not in realtime /// then it's short data values or not defined /// this is all channels. /// private short[][] ChannelData { get; set; } /// /// /// gets the sample data for a given channel index /// /// /// public short[] GetChannelData(int zeroBasedChannel) { return ChannelData[zeroBasedChannel]; } /// /// /// this is the total number of channels on the DAS /// this is used to build ChannelData /// public ushort Channels { get; set; } /// /// /// count of how many samples have been performed /// if data is ABCABCABC, then there are 3 samples returned /// public int SamplesReturned => RtData?.RtData.Length / RtData?.Channels.Length ?? 0; /// /// We need to override SyncExecute because we don't want to send anything. Instead we just want to /// read whatever is out there. Otherwise this is mostly cut and paste of normal SyncExecute with some streamlining for our specific case. /// public override void SyncExecute() { // this is a try/finally to handle the ExecuteIsBusy try { // there can be only one! recorder.ExecuteIsBusy = true; if (recorder.IsCanceled()) { throw new CanceledException(); } UserCallback = null; UserCallbackData = null; IsSynchronous = true; SyncEvent.Reset(); recorder.PseudoExecute(new byte[0], ExecuteCallback, null, IO_Timeout); var syncExecTimeout = IO_Timeout; try { if (!WaitWithCondition.Wait(SyncEvent, syncExecTimeout, recorder.CancelEvent)) { //timeout LogString("SyncExecute: timeout"); throw new TimeoutException(MakeLogString("SyncExecute: timeout")); } } catch (WaitWithCondition.ConditionMetException) { throw new CanceledException(); } // we didn't timeout, check the result switch (ComReport.Result) { case CommunicationConstantsAndEnums.CommunicationResult.Canceled: throw new CanceledException(); case CommunicationConstantsAndEnums.CommunicationResult.ReceiveOK: if (baseResponse == null) { LogString("SyncExecute: ReceiveOK but response==null!"); LogCommand(false); } if (baseResponse.Status != DFConstantsAndEnums.CommandStatus.StatusNoError) { // didn't go well var msg = MakeLogString("SyncExecute: response.Status = " + baseResponse.Status); LogCommand(false); if (baseResponse.Status == DFConstantsAndEnums.CommandStatus.StatusInvalidModeForCommand) { throw new CommandException(CommandErrorReason.InvalidMode, msg); } if (baseResponse.Status == DFConstantsAndEnums.CommandStatus.StatusUnimplemented || baseResponse.Status == DFConstantsAndEnums.CommandStatus.StatusInvalidCommand || baseResponse.Status == DFConstantsAndEnums.CommandStatus.StatusInvalidCommandType) { throw new NotImplementedException(msg); } var ex = new Exception(msg); ex.Data.Add("Status", baseResponse.Status); APILogger.Log(ex); } // everything is fine, let it exit if (LogCommands) { LogCommand(false); } break; case CommunicationConstantsAndEnums.CommunicationResult.ReceiveFailed: { var msg = MakeLogString("SyncExecute: ComReport.Result == " + ComReport.Result); LogCommand(false); throw new CommandException(CommandErrorReason.ReceiveFailed, msg); } case CommunicationConstantsAndEnums.CommunicationResult.ReceiveTimeout: { var msg = MakeLogString("SyncExecute: ComReport.Result == " + ComReport.Result); LogCommand(false); throw new CommandException(CommandErrorReason.ReceiveFailed, msg); } case CommunicationConstantsAndEnums.CommunicationResult.SendFailed: case CommunicationConstantsAndEnums.CommunicationResult.SendTimeout: { var msg = MakeLogString("SyncExecute: ComReport.Result == " + ComReport.Result); LogCommand(false); throw new CommandException(CommandErrorReason.SendFailed, msg); } default: { var msg = MakeLogString("SyncExecute: Unknown ComReport.Result == " + ComReport.Result); LogCommand(false); throw new Exception(msg); } } } finally { recorder.ExecuteIsBusy = false; ProcessData(); } } } }