init
This commit is contained in:
@@ -0,0 +1,326 @@
|
||||
using System;
|
||||
using System.IO;
|
||||
using System.Net;
|
||||
using System.Net.Sockets;
|
||||
using System.Threading;
|
||||
using DTS.Common.Enums;
|
||||
using DTS.Common.Enums.DASFactory;
|
||||
using DTS.Common.Utilities.Logging;
|
||||
|
||||
namespace DTS.DASLib.DASFactory
|
||||
{
|
||||
public class DistributorSocket : IDisposable
|
||||
{
|
||||
#region constants and enums
|
||||
private const int SLICE_DB_PORT = 8200;
|
||||
private const int MAX_LOG_SIZE_BYTES = 1024000;
|
||||
private const int MAX_CONSECUTIVE_READ_ERRORS = 2;
|
||||
#endregion
|
||||
|
||||
#region properties
|
||||
private Socket _sock;
|
||||
private readonly StreamReader _reader;
|
||||
private readonly StreamWriter _writer;
|
||||
private readonly TextLogger _logger;
|
||||
private static volatile bool _bShutDownByNow = false;
|
||||
#endregion
|
||||
|
||||
#region methods
|
||||
private static void WriteCycleExceptionHandler(Exception ex)
|
||||
{
|
||||
try
|
||||
{
|
||||
APILogger.Log("exception writing to Hearbeat log", ex);
|
||||
}
|
||||
catch (Exception)
|
||||
{
|
||||
// ignored - if we have an exception here we've already tried to log it, if we fail to log it, there's not much more to do
|
||||
}
|
||||
}
|
||||
private void Log(string message)
|
||||
{
|
||||
if (null == _logger) { return; }
|
||||
try
|
||||
{
|
||||
_logger.LogMessage($"{DateTime.Now.Year:0000}-{DateTime.Now.Month:00}-{DateTime.Now.Day:00} {DateTime.Now.Hour:00}:{DateTime.Now.Minute:00}:{DateTime.Now.Second:00}.{DateTime.Now.Millisecond:0000} {message}\r\n");
|
||||
}
|
||||
catch (Exception)
|
||||
{
|
||||
// ignored - we just tried to log a problem, but the log failed, not much more we can do
|
||||
}
|
||||
}
|
||||
|
||||
public bool IsConnected()
|
||||
{
|
||||
return null != _sock && _sock.Connected;
|
||||
}
|
||||
|
||||
public void Disconnect()
|
||||
{
|
||||
try
|
||||
{
|
||||
if (null == _sock) return;
|
||||
if (_sock.Connected)
|
||||
{
|
||||
_sock.Shutdown(SocketShutdown.Both);
|
||||
_sock.Close();
|
||||
}
|
||||
|
||||
_sock.Dispose();
|
||||
_sock = null;
|
||||
_keepAliveEnabled = false;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
APILogger.Log(ex);
|
||||
}
|
||||
}
|
||||
|
||||
public bool ReadLine(ref string target, ref bool stopFlag)
|
||||
{
|
||||
var consecutiveReadErrors = 0;
|
||||
string currentMessage;
|
||||
|
||||
// retry the reads until stopFlag is true or we get a message or we've tried 10 times
|
||||
do
|
||||
{
|
||||
try
|
||||
{
|
||||
currentMessage = _reader.ReadLine();
|
||||
if (null == currentMessage)
|
||||
{
|
||||
currentMessage = string.Empty;
|
||||
}
|
||||
else { Log(currentMessage); }
|
||||
consecutiveReadErrors = 0;
|
||||
}
|
||||
catch (IOException)
|
||||
{
|
||||
consecutiveReadErrors++;
|
||||
currentMessage = string.Empty;
|
||||
}
|
||||
} while (!stopFlag && string.IsNullOrEmpty(currentMessage) && consecutiveReadErrors < MAX_CONSECUTIVE_READ_ERRORS);
|
||||
if (stopFlag || consecutiveReadErrors >= MAX_CONSECUTIVE_READ_ERRORS)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
target = currentMessage;
|
||||
return true;
|
||||
}
|
||||
|
||||
public void SendAck()
|
||||
{
|
||||
Log(@"ACK");
|
||||
if (_sock.Connected)
|
||||
{
|
||||
try
|
||||
{
|
||||
_writer.WriteLine(@"ACK");
|
||||
_writer.Flush();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
APILogger.Log($"Failed to sendAck - {ex.Message} {_sock.RemoteEndPoint.ToString()}");
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
APILogger.Log($"SendAck not performed, socket is not connected {_sock.RemoteEndPoint.ToString()}");
|
||||
}
|
||||
}
|
||||
|
||||
private bool _keepAliveEnabled = false;
|
||||
|
||||
public bool KeepAliveEnabled()
|
||||
{
|
||||
//10477 Heartbeat and Keep-Alive System Attribute
|
||||
//Keepalive time is the duration between two keepalive transmissions in idle condition. TCP keepalive period is required to be configurable and by default is set to no less than 2 hours.
|
||||
//Keepalive interval is the duration between two successive keepalive retransmissions, if acknowledgement to the previous keepalive transmission is not received.
|
||||
//Keepalive retry is the number of retransmissions to be carried out before declaring that remote end is not available.
|
||||
//if (_keepAliveEnabled) return true;
|
||||
|
||||
Log($@"<{DFConstantsAndEnums.RemoteKeepAliveSeconds},{DFConstantsAndEnums.RemoteKeepAliveRetryIntervalSeconds},4>");
|
||||
_writer.WriteLine($@"<{DFConstantsAndEnums.RemoteKeepAliveSeconds},{DFConstantsAndEnums.RemoteKeepAliveRetryIntervalSeconds},4>");
|
||||
//System.Diagnostics.Trace.WriteLine($"{(_sock?.RemoteEndPoint as IPEndPoint)?.Address}:{(_sock?.RemoteEndPoint as IPEndPoint)?.Port} Keepalive Message Sent");
|
||||
var response = "";
|
||||
while (string.IsNullOrWhiteSpace(response))
|
||||
{
|
||||
response = _reader.ReadLine();
|
||||
}
|
||||
Log(response);
|
||||
//if (response.Contains("<ACK>"))
|
||||
//{
|
||||
// return true;
|
||||
//}
|
||||
SendAck();
|
||||
//System.Diagnostics.Trace.WriteLine($"{(_sock?.RemoteEndPoint as IPEndPoint)?.Address}:{(_sock?.RemoteEndPoint as IPEndPoint)?.Port} Keepalive Ack Sent");
|
||||
_keepAliveEnabled = true;
|
||||
return _keepAliveEnabled;
|
||||
}
|
||||
public void SendNak()
|
||||
{
|
||||
if (null != _sock && _sock.Connected)
|
||||
{
|
||||
try
|
||||
{
|
||||
Log(@"NAK");
|
||||
// Respond with ACK or NAK -- NAK doesn't necessarily mean something bad happened, it
|
||||
// just means the message received wasn't an in-band message. The SD can send a 0-length
|
||||
// packet, e.g., as a keep-alive.
|
||||
_writer.WriteLine(@"NAK");
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
APILogger.Log($"Failed to send nak - {ex.Message} - {_sock?.RemoteEndPoint.ToString()}");
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
if (null == _sock)
|
||||
{
|
||||
APILogger.Log($"failed to send nak, no sock");
|
||||
}
|
||||
else { APILogger.Log($"Failed to send nak, not connected - {_sock?.RemoteEndPoint.ToString()}"); }
|
||||
}
|
||||
}
|
||||
#endregion
|
||||
|
||||
#region constructors and initializers
|
||||
public DistributorSocket(string hostName, string hostIPAddress, ref bool slicedbCanConnect, ref ManualResetEvent slicedbCanConnectEvent, string logFolder, ManualResetEvent whKillMe)
|
||||
{
|
||||
//try
|
||||
//{
|
||||
// _logger = new TextLogger(string.Format("{1}\\{0}.log", hostName, logFolder).Replace(":", "_"), WriteCycleExceptionHandler, MAX_LOG_SIZE_BYTES);
|
||||
//}
|
||||
//catch (Exception ex) { APILogger.Log("exception setting up heartbeat logger", ex); }
|
||||
_keepAliveEnabled = false;
|
||||
_sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.IP);
|
||||
_sock.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.NoDelay, true);
|
||||
_sock.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true);
|
||||
|
||||
// https://benohead.com/windows-network-connections-timing-quickly-temporary-connectivity-loss/
|
||||
|
||||
//KeepAliveTime: default value is 2hr
|
||||
//KeepAliveInterval: default value is 1s and Detect 5 times
|
||||
uint dummy = 0; //lenth = 4
|
||||
var inOptionValues = new byte[System.Runtime.InteropServices.Marshal.SizeOf(dummy) * 3];
|
||||
var onOff = true;
|
||||
var keepAliveTimeOutMs = DFConstantsAndEnums.LocalKeepAliveTimeOutMS;
|
||||
var keepAliveRetryIntervalMs = DFConstantsAndEnums.LocalKeepAliveRetryIntervalMS;
|
||||
|
||||
BitConverter.GetBytes((uint)(onOff ? 1 : 0)).CopyTo(inOptionValues, 0);
|
||||
BitConverter.GetBytes(keepAliveTimeOutMs)
|
||||
.CopyTo(inOptionValues, System.Runtime.InteropServices.Marshal.SizeOf(dummy));
|
||||
BitConverter.GetBytes(keepAliveRetryIntervalMs)
|
||||
.CopyTo(inOptionValues, System.Runtime.InteropServices.Marshal.SizeOf(dummy) * 2);
|
||||
_sock.IOControl(IOControlCode.KeepAliveValues, inOptionValues, null);
|
||||
|
||||
if (!string.IsNullOrEmpty(hostIPAddress))
|
||||
{
|
||||
APILogger.Log($"DistributorSocket.DistributorSocket binding to {hostIPAddress}");
|
||||
_sock.Bind(new IPEndPoint(IPAddress.Parse(hostIPAddress), 0));
|
||||
}
|
||||
|
||||
do
|
||||
{
|
||||
try
|
||||
{
|
||||
// Wait until we're allowed to connect
|
||||
while (!slicedbCanConnect)
|
||||
{
|
||||
if (whKillMe.WaitOne(10, false)) { return; }
|
||||
if (_sock.Connected) { _sock.Disconnect(false); _sock.Close(); }
|
||||
slicedbCanConnectEvent.WaitOne(1000, false);
|
||||
slicedbCanConnectEvent.Reset();
|
||||
if (_bShutDownByNow) { return; }
|
||||
}
|
||||
if (whKillMe.WaitOne(10, false)) { return; }
|
||||
if (_bShutDownByNow) { return; }
|
||||
// Try to connect to a slice db
|
||||
//sock.Connect(hostName, slice_db_port);
|
||||
var result = _sock.BeginConnect(hostName, SLICE_DB_PORT, null, null);
|
||||
result.AsyncWaitHandle.WaitOne(2000, true);
|
||||
if (_sock.Connected) continue;
|
||||
_sock.Close(); Thread.Sleep(100);
|
||||
|
||||
_sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.IP);
|
||||
_sock.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true);
|
||||
|
||||
// https://benohead.com/windows-network-connections-timing-quickly-temporary-connectivity-loss/
|
||||
|
||||
//KeepAliveTime: default value is 2hr
|
||||
//KeepAliveInterval: default value is 1s and Detect 5 times
|
||||
dummy = 0; //lenth = 4
|
||||
inOptionValues = new byte[System.Runtime.InteropServices.Marshal.SizeOf(dummy) * 3];
|
||||
keepAliveTimeOutMs = DFConstantsAndEnums.LocalKeepAliveTimeOutMS;
|
||||
keepAliveRetryIntervalMs = DFConstantsAndEnums.LocalKeepAliveRetryIntervalMS;
|
||||
|
||||
BitConverter.GetBytes((uint)(onOff ? 1 : 0)).CopyTo(inOptionValues, 0);
|
||||
BitConverter.GetBytes(keepAliveTimeOutMs)
|
||||
.CopyTo(inOptionValues, System.Runtime.InteropServices.Marshal.SizeOf(dummy));
|
||||
BitConverter.GetBytes(keepAliveRetryIntervalMs)
|
||||
.CopyTo(inOptionValues, System.Runtime.InteropServices.Marshal.SizeOf(dummy) * 2);
|
||||
_sock.IOControl(IOControlCode.KeepAliveValues, inOptionValues, null);
|
||||
_sock.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.NoDelay, true);
|
||||
if (!string.IsNullOrEmpty(hostIPAddress))
|
||||
{
|
||||
APILogger.Log($"DistributorSocket.DistributorSocket binding to {hostIPAddress}");
|
||||
_sock.Bind(new IPEndPoint(IPAddress.Parse(hostIPAddress), 0));
|
||||
}
|
||||
}
|
||||
catch (Exception)
|
||||
{
|
||||
// Need to sleep here in so we don't chew up CPU when
|
||||
// there isn't a network interface available (BUG 1104)
|
||||
Thread.Sleep(500);
|
||||
}
|
||||
} while (!_sock.Connected && !whKillMe.WaitOne(10, false));
|
||||
if (!_sock.Connected) return;
|
||||
_sock.ReceiveBufferSize = 1024;
|
||||
|
||||
_reader = new StreamReader(new NetworkStream(_sock));
|
||||
_writer = new StreamWriter(new NetworkStream(_sock)) { AutoFlush = true };
|
||||
|
||||
_writer.BaseStream.WriteTimeout = 9000;
|
||||
_reader.BaseStream.ReadTimeout = 9000;
|
||||
}
|
||||
|
||||
public DistributorSocket(Socket sock, string hostName, string logFolder)
|
||||
{
|
||||
_keepAliveEnabled = false;
|
||||
//try
|
||||
//{
|
||||
// _logger = new TextLogger(string.Format("{1}\\{0}.log", hostName, logFolder).Replace(":", "_"), WriteCycleExceptionHandler, MAX_LOG_SIZE_BYTES);
|
||||
//}
|
||||
//catch (Exception ex) { APILogger.Log("exception setting up heartbeat logger", ex); }
|
||||
|
||||
_sock = sock;
|
||||
_reader = new StreamReader(new NetworkStream(_sock));
|
||||
_writer = new StreamWriter(new NetworkStream(_sock)) { AutoFlush = true };
|
||||
|
||||
_writer.BaseStream.WriteTimeout = 9000;
|
||||
_reader.BaseStream.ReadTimeout = 9000;
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
try
|
||||
{
|
||||
if (null == _sock) return;
|
||||
if (_sock.Connected)
|
||||
{
|
||||
_sock.Shutdown(SocketShutdown.Both);
|
||||
_sock.Close();
|
||||
}
|
||||
|
||||
_sock.Dispose();
|
||||
_sock = null;
|
||||
}
|
||||
catch (Exception)
|
||||
{
|
||||
// don't care
|
||||
}
|
||||
}
|
||||
#endregion
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user