//#define LOG_COMM using System; using System.Collections.Generic; using System.Text; using System.Threading; using System.Diagnostics; using DTS.Common.DASResource; using DTS.Common.ICommunication; using DTS.Common.Utilities; using DTS.Common.Utilities.Logging; using DTS.Common.Utils; using DTS.Common.Enums.Communication; using DTS.Common.Interface.Communication; using DTS.Common.Enums.DASFactory; using DTS.Common.Events; // ReSharper disable InconsistentNaming namespace DTS.DASLib.Command { /// /// Ribeye, SliceCommand, SliceDBCommand all contained Command objects which /// implemented ICommand /// I extracted as many common things as I could into one common class and formed /// an abstract base class. /// In general slice commands seemed to contain the most up to date code, and /// slice db commands and ribeye seemed to contain older code, so the majority of the code /// is actually from slice commands. /// 6/14/10 - dtm /// public abstract class AbstractCommandBase : ICommand { // this is the buffer we use to accumulate the data coming in until we have a full // response package. protected SecureQueue CommandDataBuffer; protected DTS.Common.Interface.DASFactory.ICommunication recorder; protected ManualResetEvent SyncEvent; protected CommandCallback UserCallback { get; set; } protected object UserCallbackData { get; set; } protected bool IsSynchronous { get; set; } protected CommandPacketBase baseCommand; protected CommandPacketBase baseResponse; protected ICommunicationReport ComReport; protected DFConstantsAndEnums.CommandStatus _status; protected Type ClassType; protected object _debuglock; protected int current_thread_id; protected int MinimumProtocolVersion { get; set; } protected DateTime ExecuteTime { get; set; } protected bool ExecuteIsBusy = false; protected object ExecuteBusyLock = new object(); /// /// whether to log commands or not /// public bool LogCommands { get; set; } /// /// Log Command or response /// /// if true logs command, otherwise logs response protected virtual void LogCommand(bool sending) { try { if (!LogCommands) { return; } APILogger.LogString(GetFormattedLogEntry(sending)); } catch (Exception ex) { LogString($"threw an exception in {(sending ? "CommandToString()" : "ResponseToString()")}"); APILogger.LogException(ex); } } protected virtual string MakeLogString(string msg) { var dasName = ""; if (null != recorder) { dasName = recorder.ToString(); } var cmdName = ""; try { cmdName = GetType().FullName; } catch { // we use unknown } var sequenceNumber = ""; try { sequenceNumber = baseCommand.SequenceNumber.ToString(); } catch { } //17686 add additional debug information into log if (null != baseResponse) { sequenceNumber = $"{sequenceNumber}\\{baseResponse.SequenceNumber}"; } return string.Format(Strings.CmdAbstractCommandGeneralFailure, dasName, cmdName, msg, sequenceNumber); } protected virtual void LogString(string msg) { APILogger.LogString(MakeLogString(msg)); } /// /// status of command /// public DFConstantsAndEnums.CommandStatus Status => _status; protected void EnqueueData(byte[] data) { CommandDataBuffer.Enqueue(data); } protected byte[] DequeueData(bool bResetEvent) { return CommandDataBuffer.Dequeue(bResetEvent); } #if LOG_COMM private static readonly object _COMM_LOCK_ = new object(); #endif /** * Knuth-Morris-Pratt Algorithm for Pattern Matching */ class KMPMatch { /** * Finds the first occurrence of the pattern in the text. */ public int indexOf(byte[] data, byte[] pattern, int startAddress = 0) { int[] failure = computeFailure(pattern); int j = 0; if (data.Length == 0) return -1; for (int i = startAddress; i < data.Length; i++) { while (j > 0 && pattern[j] != data[i]) { j = failure[j - 1]; } if (pattern[j] == data[i]) { j++; } if (j == pattern.Length) { return i - pattern.Length + 1; } } return -1; } /** * Computes the failure function using a boot-strapping process, * where the pattern is matched against itself. */ private int[] computeFailure(byte[] pattern) { int[] failure = new int[pattern.Length]; int j = 0; for (int i = 1; i < pattern.Length; i++) { while (j > 0 && pattern[j] != pattern[i]) { j = failure[j - 1]; } if (pattern[j] == pattern[i]) { j++; } failure[i] = j; } return failure; } } private static readonly byte[] StreamSignature = { 0xFA, 0x04, 0x07, 0x00, 0x00, 0x00, 0x10, 0xF8 }; private static readonly byte[] EndStreamSignature = { 0xFA, 0x04, 0x08, 0x00 }; private const int STREAM_PACKET_SIZE = 63518; protected virtual CommandReceiveAction ReceiveBlockOK(ICommunicationReport report) { lock (_debuglock) { if (-1 != current_thread_id) { } current_thread_id = Thread.CurrentThread.ManagedThreadId; } // use the buffer to assemble a bigger chunk EnqueueData(report.Data); var tempBuffer = DequeueData(false); if (DFConstantsAndEnums.ExtraCommunicationLogging) { APILogger.Log($"ReceivedBlock {recorder?.ConnectString ?? "N/A"}", tempBuffer); } if (baseCommand.GetCommandDescription() == "GetNextDownloadStreamDataSamples") { var match = new KMPMatch(); var index = match.indexOf(tempBuffer, StreamSignature, 1); if (index > 0) { var length = tempBuffer.Length - index; var remainder = new byte[length]; Array.Copy(tempBuffer, index, remainder, 0, length); EnqueueData(remainder); var tmp = new byte[index]; Array.Copy(tempBuffer, 0, tmp, 0, index); tempBuffer = tmp; } else { index = match.indexOf(tempBuffer, EndStreamSignature); if (index >= 0) { var tmp = new byte[index]; Array.Copy(tempBuffer, 0, tmp, 0, index); tempBuffer = tmp; } else { EnqueueData(tempBuffer); return CommandReceiveAction.ContinueReceiving; } } } //there could be multiple packets in here, we have to // a debug test if (null != tempBuffer && tempBuffer.Length > 0 && tempBuffer[0] != 0xFA) { } #if LOG_COMM lock (_COMM_LOCK_) { System.IO.File.AppendAllText(@"Logs\COMM.log", $"{DateTime.Now.Ticks} [IN] {BitConverter.ToString(tempBuffer).Replace("-", string.Empty)}\r\n"); } #endif // what's the state of this packet? // - note I think the command object should be intialized and stable, but // just incase maybe it's safer to check it before using it // 6/14/2010 - dtm var pState = baseCommand?.VerifyPacket(tempBuffer) ?? GetCommandPacket().VerifyPacket(tempBuffer); if (DFConstantsAndEnums.ExtraCommunicationLogging) { APILogger.Log($"VerifyPacket {recorder?.ConnectString ?? "N/A"} {pState.ToString()}"); } switch (pState) { case CommandPacketBase.PacketState.OK: // perfect, a good complete packet baseResponse = GetCommandPacket(tempBuffer); #if LOG_COMM lock (_COMM_LOCK_) { System.IO.File.AppendAllText(@"Logs\COMM.log", $"{DateTime.Now.Ticks} [IN] [OK] seq:{baseResponse.SequenceNumber} {baseCommand.GetCommandDescription()}\r\n"); #endif if (baseCommand.GetCommandDescription() == "EndRealtimeMode") { //if (baseCommand.SequenceNumber != baseResponse.SequenceNumber) { //look for the response KMPMatch match = new KMPMatch(); //look for the command header for end realtime (0x08, 0x02) //if we find it we have a response to end realtime, if not we have //other data and should keep waiting var target = new byte[] { 0xFA, 0x08, 0x02 }; //baseCommand.ToBytes(); var index = match.indexOf(tempBuffer, target); if (index < 0) { #if LOG_COMM System.IO.File.AppendAllText(@"Logs\COMM.log", $"{DateTime.Now.Ticks} - NOT COMPLETE YET\r\n"); #endif //this suggests the response we have is a realtime packet and NOT the end packet return DataTooShort(tempBuffer); } else { #if LOG_COMM System.IO.File.AppendAllText(@"Logs\COMM.log", $"{DateTime.Now.Ticks} - COMPLETE\r\n"); #endif } } } #if LOG_COMM } #endif WholePackage(); if (IsSynchronous) { lock (_debuglock) { current_thread_id = -1; } SyncEvent.Set(); } else { // we're not doing a SyncExecute so we must flag the command done recorder.ExecuteIsBusy = false; WholePackagePost(); lock (_debuglock) { current_thread_id = -1; } } return CommandReceiveAction.StopReceiving; case CommandPacketBase.PacketState.TooShort: // not enough data, keep going lock (_debuglock) { current_thread_id = -1; } return DataTooShort(tempBuffer); case CommandPacketBase.PacketState.Unknown: // not good if (!IsSynchronous) { // we're not doing a SyncExecute so we must flag the command done recorder.ExecuteIsBusy = false; lock (_debuglock) { current_thread_id = -1; } return DataUnknown(report); } SyncEvent.Set(); return CommandReceiveAction.StopReceiving; default: Debug.Assert(false, "CommandBase.ReceiveBlockOK: Unhandled case " + pState.ToString()); return CommandReceiveAction.StopReceiving; } } protected virtual CommandReceiveAction DataTooShort(byte[] dataSoFar) { // we need more data CommandDataBuffer.Enqueue(dataSoFar); return CommandReceiveAction.ContinueReceiving; } protected virtual CommandReceiveAction DataUnknown(ICommunicationReport report) { // a bad one var cbReport = new CommandReport(CommandStatus.Failure, UserCallbackData); LogString("DataUnknown: reporting failure " + report); UserCallback(cbReport); return CommandReceiveAction.StopReceiving; } protected virtual CommandReceiveAction WholePackage() { return CommandReceiveAction.StopReceiving; } protected virtual CommandReceiveAction WholePackagePost() { // we have a whole package, do a default response var stat = CommandStatus.Success; if (baseResponse.Status != DFConstantsAndEnums.CommandStatus.StatusNoError) { var s = (int)baseResponse.Status; LogString("WholePackagePost: reporting failure, status==" + CommandPacketBase.StatusLabels[s] + " (0x" + s.ToString("X") + ")"); stat = CommandStatus.Failure; } var acr = new CommandReport(stat, UserCallbackData); return UserCallback(acr); } protected virtual CommandReceiveAction SendReceiveError(ICommunicationReport report) { if (IsSynchronous) { string result = "null"; if (null != report) { result = report.Result.ToString(); } LogString("SendReceiveError: SyncEvent.Set(), result=" + result); SyncEvent.Set(); } else { if (UserCallback == null) { throw new ApplicationException(Strings.Slice_CommandBase_SendReceiveError_Err1); } LogString("SendReceiveError: reporting failure, report==" + report); var cbReport = new CommandReport(CommandStatus.Failure, UserCallbackData); UserCallback(cbReport); } return CommandReceiveAction.StopReceiving; } /// /// Status of response packet /// public DFConstantsAndEnums.CommandStatus ResponseStatus { get { if (null == baseResponse) return DFConstantsAndEnums.CommandStatus.StatusNoResponse; return baseResponse.Status; } } /// /// string description of status in hex? /// public string StatusString => CommandPacketBase.StatusLabels[(int)_status] + " (0x" + _status.ToString("X") + ")"; /// /// default I/O timeout in ms? /// public const int Default_IO_Timeout = 120000; //System.Threading.Timeout.Infinite; //1000; /// /// timeout for specific commands in ms? /// public int IO_Timeout { get; set; } /// /// note, this was internal, it's not designed to be accessed by more than 1 level of inheritance /// /// /// internal void SetupThis(DTS.Common.Interface.DASFactory.ICommunication sock, int TimeoutMillisec) { ClassType = GetType(); CommandDataBuffer = new SecureQueue(SecureQueue.NullPolicy.SkipNull, "CommandBase.CommandDataBuffer"); recorder = sock; SyncEvent = new ManualResetEvent(false); UserCallback = null; UserCallbackData = null; IsSynchronous = false; baseResponse = null; ComReport = null; LogCommands = true; _status = DFConstantsAndEnums.CommandStatus.StatusUnimplemented; IO_Timeout = TimeoutMillisec; // The first protocol version was 1, so set that as the default here MinimumProtocolVersion = 1; } protected AbstractCommandBase(DTS.Common.Interface.DASFactory.ICommunication sock) { _debuglock = new object(); lock (_debuglock) { current_thread_id = -1; } SetupThis(sock, Default_IO_Timeout); } protected AbstractCommandBase(DTS.Common.Interface.DASFactory.ICommunication sock, int TimeoutMillisec) { _debuglock = new object(); lock (_debuglock) { current_thread_id = -1; } SetupThis(sock, TimeoutMillisec); } /// /// Both Execute and SyncExecute uses this function as their callback (since /// both calls recorder.Execute). /// /// /// true if recorder should wait for more data, false otherwise /// protected virtual bool ExecuteCallback(ICommunicationReport report) { // we should only handle ExecuteIsBusy if we're NOT run thry SyncExecute ComReport = report; switch (report.Result) { case CommunicationConstantsAndEnums.CommunicationResult.ReceiveOK: // ReceiveBlockOK takes care of the ExecuteIsBusy flag return ReceiveBlockOK(report) == CommandReceiveAction.ContinueReceiving; case CommunicationConstantsAndEnums.CommunicationResult.SendTimeout: case CommunicationConstantsAndEnums.CommunicationResult.SendFailed: case CommunicationConstantsAndEnums.CommunicationResult.ReceiveTimeout: case CommunicationConstantsAndEnums.CommunicationResult.ReceiveFailed: // this will always result in termination LogString("ExecuteCallback: " + report.Result); if (!IsSynchronous) { recorder.ExecuteIsBusy = false; } SendReceiveError(report); return false; case CommunicationConstantsAndEnums.CommunicationResult.Canceled: if (IsSynchronous) { SyncEvent.Set(); } else { recorder.ExecuteIsBusy = false; var cbReport = new CommandReport(CommandStatus.Canceled, UserCallbackData); UserCallback(cbReport); } return false; } Debug.Assert(false, "CommandBase.ExecuteCallback: unhandled case " + report.Result); return true; } private void SurfaceError(string msg) { try { PageErrorEvent.SurfaceApplicationError(msg); } catch (Exception ex) { APILogger.Log(ex); } } private Random _random = new Random(DateTime.Now.Second); protected void InternalSyncExecute() { // this is a try/finally to handle the ExecuteIsBusy try { // there can be only one! recorder.ExecuteIsBusy = true; if (recorder.ProtocolVersion < MinimumProtocolVersion) { throw new Exception(Strings.Slice_CommandBase_SyncExecute_Err6); } if (recorder.IsCanceled()) { throw new CanceledException(); } if (baseCommand.AlreadyRun) { baseCommand.GetNextSequenceNumber(); } baseCommand.AlreadyRun = true; baseCommand.ComputeCRCs(); var CommandBytes = baseCommand.ToBytes(); baseCommand.OriginalBytes = CommandBytes; #if LOG_COMM lock (_COMM_LOCK_) { System.IO.File.AppendAllText(@"Logs\COMM.log", $"{DateTime.Now.Ticks} [OUT] seq:{baseCommand.SequenceNumber} {baseCommand.GetCommandDescription()} - {BitConverter.ToString(CommandBytes).Replace("-", string.Empty)}\r\n"); } #endif if (LogCommands) { LogCommand(true); } UserCallback = null; UserCallbackData = null; IsSynchronous = true; SyncEvent.Reset(); ExecuteTime = DateTime.Now; recorder.Execute(CommandBytes, ExecuteCallback, null, IO_Timeout); var syncExecTimeout = IO_Timeout; if (syncExecTimeout != Timeout.Infinite) { syncExecTimeout *= 3; } try { if (!WaitWithCondition.Wait(SyncEvent, syncExecTimeout, recorder.CancelEvent)) { // timeout LogString("SyncExecute: timeout"); throw new TimeoutException(MakeLogString("SyncExecute: timeout")); } } catch (WaitWithCondition.ConditionMetException) { throw new CanceledException(); } // we didn't timeout, check the result switch (ComReport.Result) { case CommunicationConstantsAndEnums.CommunicationResult.Canceled: throw new CanceledException(); case CommunicationConstantsAndEnums.CommunicationResult.ReceiveOK: if (baseResponse == null) { LogString("SyncExecute: ReceiveOK but response==null!"); LogCommand(false); throw new Exception(MakeLogString("SyncExecute: ReceiveOK but response==null!")); } if (baseResponse.Status == DFConstantsAndEnums.CommandStatus.StatusSlicebusNoResponse) { SurfaceError($"Possible hardware issue detected on {recorder.SerialNumber} [SliceBusNoResponse]. Please contact DTS support"); } if (baseResponse.Status != DFConstantsAndEnums.CommandStatus.StatusNoError) { // didn't go well var msg = MakeLogString("SyncExecute: response.Status = " + baseResponse.Status); LogCommand(false); if (baseResponse.Status == DFConstantsAndEnums.CommandStatus.StatusInvalidModeForCommand || baseResponse.Status == DFConstantsAndEnums.CommandStatus.StatusInvalidCommand) { throw new CommandException(CommandErrorReason.InvalidMode, msg); } if (baseResponse.Status == DFConstantsAndEnums.CommandStatus.StatusUnimplemented || baseResponse.Status == DFConstantsAndEnums.CommandStatus.StatusInvalidCommand || baseResponse.Status == DFConstantsAndEnums.CommandStatus.StatusInvalidCommandType) { throw new NotImplementedException(msg); } var ex = new Exception(msg); ex.Data.Add("Status", baseResponse.Status); throw ex; } // everything is fine, let it exit if (LogCommands) { LogCommand(false); } break; case CommunicationConstantsAndEnums.CommunicationResult.ReceiveFailed: { var msg = MakeLogString("SyncExecute: ComReport.Result == " + ComReport.Result); APILogger.LogString(msg); LogCommand(false); throw new CommandException(CommandErrorReason.ReceiveFailed, msg); } case CommunicationConstantsAndEnums.CommunicationResult.ReceiveTimeout: { var msg = MakeLogString("SyncExecute: ComReport.Result == " + ComReport.Result); APILogger.LogString(msg); LogCommand(false); throw new CommandException(CommandErrorReason.ReceiveFailed, msg); } case CommunicationConstantsAndEnums.CommunicationResult.SendFailed: case CommunicationConstantsAndEnums.CommunicationResult.SendTimeout: { var msg = MakeLogString("SyncExecute: ComReport.Result == " + ComReport.Result); APILogger.LogString(msg); LogCommand(false); throw new CommandException(CommandErrorReason.SendFailed, msg); } default: { var msg = MakeLogString("SyncExecute: Unknown ComReport.Result == " + ComReport.Result); APILogger.LogString(msg); LogCommand(false); throw new Exception(msg); } } } finally { recorder.ExecuteIsBusy = false; } } public virtual void SyncExecute() { try { InternalSyncExecute(); // if we get here, everything is fine return; } catch (TimeoutException te) { // and again LogString("SyncExecute: timeout"); //the retry here was causing a lot of issues //seems better to just accept the death of the command and re-issue as needed throw te; } } public virtual void Execute(CommandCallback cb, object cbData) { // this try/catch is to only handle ExecuteIsBusy try { // there can be only one! recorder.ExecuteIsBusy = true; if (!recorder.Connected) { // "Slice.CommandBase.Execute: No recorder connected" throw new System.Exception(Strings.Slice_CommandBase_Execute_Err1); } if (cb == null) { // "Slice.CommandBase.Execute: Callback can't be null" throw new ArgumentException(Strings.Slice_CommandBase_Execute_Err2); } if (recorder.ProtocolVersion < MinimumProtocolVersion) { // "Slice.CommandBase.Execute: The recorder's protocol version does not support this command" throw new System.Exception(Strings.Slice_CommandBase_Execute_Err3); } if (recorder.IsCanceled()) { throw new CanceledException(); } // this is the tail end of execute, do some bookeeping first //Debug.Assert(false == command.AlreadyRun); if (baseCommand.AlreadyRun) { baseCommand.GetNextSequenceNumber(); } baseCommand.AlreadyRun = true; baseCommand.ComputeCRCs(); var CommandBytes = baseCommand.ToBytes(); baseCommand.OriginalBytes = CommandBytes; UserCallback = cb; UserCallbackData = cbData; IsSynchronous = false; LogCommand(true); ExecuteTime = DateTime.Now; recorder.Execute(CommandBytes, ExecuteCallback, null, IO_Timeout); } catch (System.Exception ex) { APILogger.Log(ex); // if an exception has happend the execute is actually started and execute is therefore not busy. // After execute starts the only responses we get is thru ExecuteCallback recorder.ExecuteIsBusy = false; throw; } } /// /// returns a commandpacket object /// this is necessary because commandpackets are specific to inheritted classes /// and can contain different packing of bytes and different verification schemes /// an example would be /// /// /// protected abstract CommandPacketBase GetCommandPacket(byte[] buffer); protected abstract CommandPacketBase GetCommandPacket(); /// /// allows inheriting classes to add their own log statements to entries /// when overriding, be sure to call base class implementation /// the first line of the two dimensional array is the header information /// from the command (like sequence number, type of command, etc) /// /// public virtual void CommandToString(ref List> lines) { } /// /// allows inheriting classes to add their own log statements to log entries /// when overriding, make sure to call the base class to pick up statements /// added by parent classes /// the base class will add a line if there is an error /// /// public virtual void ResponseToString(ref List> lines) { if (null == baseResponse || baseResponse.Status != DFConstantsAndEnums.CommandStatus.StatusNoError) { lines.Add(new List() { $"XXXXX ERROR XXXXX {baseResponse.Status.ToString()}" }); } } public const string RESPONSESTART_STRING = "<- "; private const string SPACE_STRING = " "; public const string COMMANDSTART_STRING = "-> "; /// /// outputs a two dimensional array of log information into a single string /// using a two dimensional array means that subclasses don't need to care /// about how much data was already put into the arrays, they can just add /// whatever data they need to report /// /// two dimensional array of log data /// /// private string FormatLogEntry(List> lines, bool bSending) { StringBuilder sb = new StringBuilder(200); for (var i = 0; i < lines.Count; i++) { if (0 == i) { sb.Append(bSending ? COMMANDSTART_STRING : RESPONSESTART_STRING); } else { sb.Append(SPACE_STRING); } for (var j = 0; j < lines[i].Count; j++) { if (j > 0) { sb.Append(", "); } sb.Append(lines[i][j]); } } return sb.ToString(); } /// /// outputs a single string for logging /// /// log outgoing or ingoing command /// private string GetFormattedLogEntry(bool bSending) { var lines = new List>(); if (bSending) { baseCommand.GetPacketLogHeader(ref lines); CommandToString(ref lines); } else { if (null != baseResponse) { baseResponse.GetPacketLogHeader(baseCommand, ref lines, ExecuteTime); ResponseToString(ref lines); } } return FormatLogEntry(lines, bSending); } } }