Files
DP44/DataPRO/DASFactory/DistributorSocket.cs
2026-04-17 14:55:32 -04:00

327 lines
14 KiB
C#

using System;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using DTS.Common.Enums;
using DTS.Common.Enums.DASFactory;
using DTS.Common.Utilities.Logging;
namespace DTS.DASLib.DASFactory
{
public class DistributorSocket : IDisposable
{
#region constants and enums
private const int SLICE_DB_PORT = 8200;
private const int MAX_LOG_SIZE_BYTES = 1024000;
private const int MAX_CONSECUTIVE_READ_ERRORS = 2;
#endregion
#region properties
private Socket _sock;
private readonly StreamReader _reader;
private readonly StreamWriter _writer;
private readonly TextLogger _logger;
private static volatile bool _bShutDownByNow = false;
#endregion
#region methods
private static void WriteCycleExceptionHandler(Exception ex)
{
try
{
APILogger.Log("exception writing to Hearbeat log", ex);
}
catch (Exception)
{
// ignored - if we have an exception here we've already tried to log it, if we fail to log it, there's not much more to do
}
}
private void Log(string message)
{
if (null == _logger) { return; }
try
{
_logger.LogMessage($"{DateTime.Now.Year:0000}-{DateTime.Now.Month:00}-{DateTime.Now.Day:00} {DateTime.Now.Hour:00}:{DateTime.Now.Minute:00}:{DateTime.Now.Second:00}.{DateTime.Now.Millisecond:0000} {message}\r\n");
}
catch (Exception)
{
// ignored - we just tried to log a problem, but the log failed, not much more we can do
}
}
public bool IsConnected()
{
return null != _sock && _sock.Connected;
}
public void Disconnect()
{
try
{
if (null == _sock) return;
if (_sock.Connected)
{
_sock.Shutdown(SocketShutdown.Both);
_sock.Close();
}
_sock.Dispose();
_sock = null;
_keepAliveEnabled = false;
}
catch (Exception ex)
{
APILogger.Log(ex);
}
}
public bool ReadLine(ref string target, ref bool stopFlag)
{
var consecutiveReadErrors = 0;
string currentMessage;
// retry the reads until stopFlag is true or we get a message or we've tried 10 times
do
{
try
{
currentMessage = _reader.ReadLine();
if (null == currentMessage)
{
currentMessage = string.Empty;
}
else { Log(currentMessage); }
consecutiveReadErrors = 0;
}
catch (IOException)
{
consecutiveReadErrors++;
currentMessage = string.Empty;
}
} while (!stopFlag && string.IsNullOrEmpty(currentMessage) && consecutiveReadErrors < MAX_CONSECUTIVE_READ_ERRORS);
if (stopFlag || consecutiveReadErrors >= MAX_CONSECUTIVE_READ_ERRORS)
{
return false;
}
target = currentMessage;
return true;
}
public void SendAck()
{
Log(@"ACK");
if (_sock.Connected)
{
try
{
_writer.WriteLine(@"ACK");
_writer.Flush();
}
catch (Exception ex)
{
APILogger.Log($"Failed to sendAck - {ex.Message} {_sock.RemoteEndPoint.ToString()}");
}
}
else
{
APILogger.Log($"SendAck not performed, socket is not connected {_sock.RemoteEndPoint.ToString()}");
}
}
private bool _keepAliveEnabled = false;
public bool KeepAliveEnabled()
{
//10477 Heartbeat and Keep-Alive System Attribute
//Keepalive time is the duration between two keepalive transmissions in idle condition. TCP keepalive period is required to be configurable and by default is set to no less than 2 hours.
//Keepalive interval is the duration between two successive keepalive retransmissions, if acknowledgement to the previous keepalive transmission is not received.
//Keepalive retry is the number of retransmissions to be carried out before declaring that remote end is not available.
//if (_keepAliveEnabled) return true;
Log($@"<{DFConstantsAndEnums.RemoteKeepAliveSeconds},{DFConstantsAndEnums.RemoteKeepAliveRetryIntervalSeconds},4>");
_writer.WriteLine($@"<{DFConstantsAndEnums.RemoteKeepAliveSeconds},{DFConstantsAndEnums.RemoteKeepAliveRetryIntervalSeconds},4>");
//System.Diagnostics.Trace.WriteLine($"{(_sock?.RemoteEndPoint as IPEndPoint)?.Address}:{(_sock?.RemoteEndPoint as IPEndPoint)?.Port} Keepalive Message Sent");
var response = "";
while (string.IsNullOrWhiteSpace(response))
{
response = _reader.ReadLine();
}
Log(response);
//if (response.Contains("<ACK>"))
//{
// return true;
//}
SendAck();
//System.Diagnostics.Trace.WriteLine($"{(_sock?.RemoteEndPoint as IPEndPoint)?.Address}:{(_sock?.RemoteEndPoint as IPEndPoint)?.Port} Keepalive Ack Sent");
_keepAliveEnabled = true;
return _keepAliveEnabled;
}
public void SendNak()
{
if (null != _sock && _sock.Connected)
{
try
{
Log(@"NAK");
// Respond with ACK or NAK -- NAK doesn't necessarily mean something bad happened, it
// just means the message received wasn't an in-band message. The SD can send a 0-length
// packet, e.g., as a keep-alive.
_writer.WriteLine(@"NAK");
}
catch (Exception ex)
{
APILogger.Log($"Failed to send nak - {ex.Message} - {_sock?.RemoteEndPoint.ToString()}");
}
}
else
{
if (null == _sock)
{
APILogger.Log($"failed to send nak, no sock");
}
else { APILogger.Log($"Failed to send nak, not connected - {_sock?.RemoteEndPoint.ToString()}"); }
}
}
#endregion
#region constructors and initializers
public DistributorSocket(string hostName, string hostIPAddress, ref bool slicedbCanConnect, ref ManualResetEvent slicedbCanConnectEvent, string logFolder, ManualResetEvent whKillMe)
{
//try
//{
// _logger = new TextLogger(string.Format("{1}\\{0}.log", hostName, logFolder).Replace(":", "_"), WriteCycleExceptionHandler, MAX_LOG_SIZE_BYTES);
//}
//catch (Exception ex) { APILogger.Log("exception setting up heartbeat logger", ex); }
_keepAliveEnabled = false;
_sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.IP);
_sock.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.NoDelay, true);
_sock.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true);
// https://benohead.com/windows-network-connections-timing-quickly-temporary-connectivity-loss/
//KeepAliveTime: default value is 2hr
//KeepAliveInterval: default value is 1s and Detect 5 times
uint dummy = 0; //lenth = 4
var inOptionValues = new byte[System.Runtime.InteropServices.Marshal.SizeOf(dummy) * 3];
var onOff = true;
var keepAliveTimeOutMs = DFConstantsAndEnums.LocalKeepAliveTimeOutMS;
var keepAliveRetryIntervalMs = DFConstantsAndEnums.LocalKeepAliveRetryIntervalMS;
BitConverter.GetBytes((uint)(onOff ? 1 : 0)).CopyTo(inOptionValues, 0);
BitConverter.GetBytes(keepAliveTimeOutMs)
.CopyTo(inOptionValues, System.Runtime.InteropServices.Marshal.SizeOf(dummy));
BitConverter.GetBytes(keepAliveRetryIntervalMs)
.CopyTo(inOptionValues, System.Runtime.InteropServices.Marshal.SizeOf(dummy) * 2);
_sock.IOControl(IOControlCode.KeepAliveValues, inOptionValues, null);
if (!string.IsNullOrEmpty(hostIPAddress))
{
APILogger.Log($"DistributorSocket.DistributorSocket binding to {hostIPAddress}");
_sock.Bind(new IPEndPoint(IPAddress.Parse(hostIPAddress), 0));
}
do
{
try
{
// Wait until we're allowed to connect
while (!slicedbCanConnect)
{
if (whKillMe.WaitOne(10, false)) { return; }
if (_sock.Connected) { _sock.Disconnect(false); _sock.Close(); }
slicedbCanConnectEvent.WaitOne(1000, false);
slicedbCanConnectEvent.Reset();
if (_bShutDownByNow) { return; }
}
if (whKillMe.WaitOne(10, false)) { return; }
if (_bShutDownByNow) { return; }
// Try to connect to a slice db
//sock.Connect(hostName, slice_db_port);
var result = _sock.BeginConnect(hostName, SLICE_DB_PORT, null, null);
result.AsyncWaitHandle.WaitOne(2000, true);
if (_sock.Connected) continue;
_sock.Close(); Thread.Sleep(100);
_sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.IP);
_sock.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true);
// https://benohead.com/windows-network-connections-timing-quickly-temporary-connectivity-loss/
//KeepAliveTime: default value is 2hr
//KeepAliveInterval: default value is 1s and Detect 5 times
dummy = 0; //lenth = 4
inOptionValues = new byte[System.Runtime.InteropServices.Marshal.SizeOf(dummy) * 3];
keepAliveTimeOutMs = DFConstantsAndEnums.LocalKeepAliveTimeOutMS;
keepAliveRetryIntervalMs = DFConstantsAndEnums.LocalKeepAliveRetryIntervalMS;
BitConverter.GetBytes((uint)(onOff ? 1 : 0)).CopyTo(inOptionValues, 0);
BitConverter.GetBytes(keepAliveTimeOutMs)
.CopyTo(inOptionValues, System.Runtime.InteropServices.Marshal.SizeOf(dummy));
BitConverter.GetBytes(keepAliveRetryIntervalMs)
.CopyTo(inOptionValues, System.Runtime.InteropServices.Marshal.SizeOf(dummy) * 2);
_sock.IOControl(IOControlCode.KeepAliveValues, inOptionValues, null);
_sock.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.NoDelay, true);
if (!string.IsNullOrEmpty(hostIPAddress))
{
APILogger.Log($"DistributorSocket.DistributorSocket binding to {hostIPAddress}");
_sock.Bind(new IPEndPoint(IPAddress.Parse(hostIPAddress), 0));
}
}
catch (Exception)
{
// Need to sleep here in so we don't chew up CPU when
// there isn't a network interface available (BUG 1104)
Thread.Sleep(500);
}
} while (!_sock.Connected && !whKillMe.WaitOne(10, false));
if (!_sock.Connected) return;
_sock.ReceiveBufferSize = 1024;
_reader = new StreamReader(new NetworkStream(_sock));
_writer = new StreamWriter(new NetworkStream(_sock)) { AutoFlush = true };
_writer.BaseStream.WriteTimeout = 9000;
_reader.BaseStream.ReadTimeout = 9000;
}
public DistributorSocket(Socket sock, string hostName, string logFolder)
{
_keepAliveEnabled = false;
//try
//{
// _logger = new TextLogger(string.Format("{1}\\{0}.log", hostName, logFolder).Replace(":", "_"), WriteCycleExceptionHandler, MAX_LOG_SIZE_BYTES);
//}
//catch (Exception ex) { APILogger.Log("exception setting up heartbeat logger", ex); }
_sock = sock;
_reader = new StreamReader(new NetworkStream(_sock));
_writer = new StreamWriter(new NetworkStream(_sock)) { AutoFlush = true };
_writer.BaseStream.WriteTimeout = 9000;
_reader.BaseStream.ReadTimeout = 9000;
}
public void Dispose()
{
try
{
if (null == _sock) return;
if (_sock.Connected)
{
_sock.Shutdown(SocketShutdown.Both);
_sock.Close();
}
_sock.Dispose();
_sock = null;
}
catch (Exception)
{
// don't care
}
}
#endregion
}
}