diff --git a/src/WatsonTcp/WatsonTcpClient.cs b/src/WatsonTcp/WatsonTcpClient.cs index 7593231..24c8a18 100644 --- a/src/WatsonTcp/WatsonTcpClient.cs +++ b/src/WatsonTcp/WatsonTcpClient.cs @@ -152,7 +152,7 @@ public ISerializationHelper SerializationHelper private string _SourceIp = null; private int _SourcePort = 0; private string _ServerIp = null; - private int _ServerPort = 0; + private int _ServerPort = 0; private TcpClient _Client = null; private Stream _DataStream = null; @@ -192,7 +192,7 @@ public WatsonTcpClient( { if (String.IsNullOrEmpty(serverIp)) throw new ArgumentNullException(nameof(serverIp)); if (serverPort < 0) throw new ArgumentOutOfRangeException(nameof(serverPort)); - + _Mode = Mode.Tcp; _ServerIp = serverIp; _ServerPort = serverPort; @@ -218,7 +218,7 @@ public WatsonTcpClient( { if (String.IsNullOrEmpty(serverIp)) throw new ArgumentNullException(nameof(serverIp)); if (serverPort < 0) throw new ArgumentOutOfRangeException(nameof(serverPort)); - + _Mode = Mode.Ssl; _TlsVersion = tlsVersion; _ServerIp = serverIp; @@ -257,15 +257,15 @@ public WatsonTcpClient( /// The SSL certificate /// The TLS version used for this conenction. public WatsonTcpClient( - string serverIp, - int serverPort, + string serverIp, + int serverPort, X509Certificate2 cert, TlsVersion tlsVersion = TlsVersion.Tls12) { if (String.IsNullOrEmpty(serverIp)) throw new ArgumentNullException(nameof(serverIp)); if (serverPort < 0) throw new ArgumentOutOfRangeException(nameof(serverPort)); if (cert == null) throw new ArgumentNullException(nameof(cert)); - + _Mode = Mode.Ssl; _TlsVersion = tlsVersion; _SslCertificate = cert; @@ -285,13 +285,13 @@ public WatsonTcpClient( #region Public-Methods - /// + /// /// Disconnect the client and dispose of background workers. /// Do not reuse the object after disposal. /// public void Dispose() { - Dispose(true); + Dispose(true); GC.SuppressFinalize(this); } @@ -495,7 +495,7 @@ public void Disconnect(bool sendNotice = true) _Token = default(CancellationToken); } } - + if (_SslStream != null) { _SslStream.Close(); @@ -573,7 +573,7 @@ public async Task SendAsync(byte[] data, Dictionary metada WatsonCommon.BytesToStream(data, start, out int contentLength, out Stream stream); return await SendAsync(contentLength, stream, metadata, token).ConfigureAwait(false); } - + /// /// Send data and metadata to the server from a stream asynchronously. /// @@ -675,7 +675,7 @@ protected virtual void Dispose(bool disposing) if (_ReadLock != null) _ReadLock.Dispose(); - _Settings = null; + Settings = null; _Events = null; _Callbacks = null; _Statistics = null; @@ -696,7 +696,7 @@ protected virtual void Dispose(bool disposing) _ReadLock = null; _DataReceiver = null; - } + } } #region Connection @@ -725,10 +725,10 @@ private void EnableKeepalives() // .NET Framework expects values in milliseconds - byte[] keepAlive = new byte[12]; - Buffer.BlockCopy(BitConverter.GetBytes((uint)1), 0, keepAlive, 0, 4); - Buffer.BlockCopy(BitConverter.GetBytes((uint)(_Keepalive.TcpKeepAliveTime * 1000)), 0, keepAlive, 4, 4); - Buffer.BlockCopy(BitConverter.GetBytes((uint)(_Keepalive.TcpKeepAliveInterval * 1000)), 0, keepAlive, 8, 4); + byte[] keepAlive = new byte[12]; + Buffer.BlockCopy(BitConverter.GetBytes((uint)1), 0, keepAlive, 0, 4); + Buffer.BlockCopy(BitConverter.GetBytes((uint)(_Keepalive.TcpKeepAliveTime * 1000)), 0, keepAlive, 4, 4); + Buffer.BlockCopy(BitConverter.GetBytes((uint)(_Keepalive.TcpKeepAliveInterval * 1000)), 0, keepAlive, 8, 4); _Client.Client.IOControl(IOControlCode.KeepAliveValues, keepAlive, null); #elif NETSTANDARD @@ -755,7 +755,7 @@ private async Task DataReceiver(CancellationToken token) try { token.ThrowIfCancellationRequested(); - + #region Check-for-Connection if (_Client == null || !_Client.Connected) @@ -771,7 +771,7 @@ private async Task DataReceiver(CancellationToken token) await _ReadLock.WaitAsync(token); WatsonMessage msg = await _MessageBuilder.BuildFromStream(_DataStream, token); if (msg == null) - { + { await Task.Delay(30, token).ConfigureAwait(false); continue; } @@ -815,7 +815,7 @@ private async Task DataReceiver(CancellationToken token) } else if (msg.Status == MessageStatus.AuthRequired) { - _Settings.Logger?.Invoke(Severity.Info, _Header + "authentication required by server; please authenticate using pre-shared key"); + _Settings.Logger?.Invoke(Severity.Info, _Header + "authentication required by server; please authenticate using pre-shared key"); string psk = _Callbacks.HandleAuthenticationRequested(); if (!String.IsNullOrEmpty(psk)) await AuthenticateAsync(psk, token); continue; @@ -828,7 +828,7 @@ private async Task DataReceiver(CancellationToken token) if (msg.SyncRequest) { _Settings.Logger?.Invoke(Severity.Debug, _Header + "synchronous request received: " + msg.ConversationGuid.ToString()); - + DateTime expiration = WatsonCommon.GetExpirationTimestamp(msg); byte[] msgData = await WatsonCommon.ReadMessageDataAsync(msg, _Settings.StreamBufferSize, token).ConfigureAwait(false); @@ -871,17 +871,17 @@ private async Task DataReceiver(CancellationToken token) respMsg.ConversationGuid = msg.ConversationGuid; await SendInternalAsync(respMsg, contentLength, stream, token).ConfigureAwait(false); } - }, _Token); + }, _Token); } else - { + { _Settings.Logger?.Invoke(Severity.Debug, _Header + "expired synchronous request received and discarded"); - } + } } else if (msg.SyncResponse) { // No need to amend message expiration; it is copied from the request, which was set by this node - // DateTime expiration = WatsonCommon.GetExpirationTimestamp(msg); + // DateTime expiration = WatsonCommon.GetExpirationTimestamp(msg); _Settings.Logger?.Invoke(Severity.Debug, _Header + "synchronous response received: " + msg.ConversationGuid.ToString()); byte[] msgData = await WatsonCommon.ReadMessageDataAsync(msg, _Settings.StreamBufferSize, token).ConfigureAwait(false); @@ -902,8 +902,8 @@ private async Task DataReceiver(CancellationToken token) byte[] msgData = null; if (_Events.IsUsingMessages) - { - msgData = await WatsonCommon.ReadMessageDataAsync(msg, _Settings.StreamBufferSize, token).ConfigureAwait(false); + { + msgData = await WatsonCommon.ReadMessageDataAsync(msg, _Settings.StreamBufferSize, token).ConfigureAwait(false); MessageReceivedEventArgs args = new MessageReceivedEventArgs(null, msg.Metadata, msgData); await Task.Run(() => _Events.HandleMessageReceived(this, args)); } @@ -915,16 +915,16 @@ private async Task DataReceiver(CancellationToken token) if (msg.ContentLength >= _Settings.MaxProxiedStreamSize) { ws = new WatsonStream(msg.ContentLength, msg.DataStream); - sr = new StreamReceivedEventArgs(null, msg.Metadata, msg.ContentLength, ws); + sr = new StreamReceivedEventArgs(null, msg.Metadata, msg.ContentLength, ws); _Events.HandleStreamReceived(this, sr); } else { MemoryStream ms = await WatsonCommon.DataStreamToMemoryStream(msg.ContentLength, msg.DataStream, _Settings.StreamBufferSize, token).ConfigureAwait(false); ws = new WatsonStream(msg.ContentLength, ms); - sr = new StreamReceivedEventArgs(null, msg.Metadata, msg.ContentLength, ws); + sr = new StreamReceivedEventArgs(null, msg.Metadata, msg.ContentLength, ws); Task unawaited = Task.Run(() => _Events.HandleStreamReceived(this, sr), token); - } + } } else { @@ -968,7 +968,7 @@ private async Task DataReceiver(CancellationToken token) _Header + "data receiver exception for " + _ServerIp + ":" + _ServerPort + ": " + e.Message + Environment.NewLine); _Events?.HandleExceptionEncountered(this, new ExceptionEventArgs(e)); break; - } + } finally { if (_ReadLock != null) _ReadLock.Release(); @@ -991,7 +991,7 @@ private async Task SendInternalAsync(WatsonMessage msg, long contentLength { if (msg == null) throw new ArgumentNullException(nameof(msg)); if (!Connected) return false; - + if (contentLength > 0 && (stream == null || !stream.CanRead)) { throw new ArgumentException("Cannot read from supplied stream."); @@ -1004,17 +1004,17 @@ private async Task SendInternalAsync(WatsonMessage msg, long contentLength } bool disconnectDetected = false; - + if (_Client == null || !_Client.Connected) { return false; } - + await _WriteLock.WaitAsync(token).ConfigureAwait(false); try - { - await SendHeadersAsync(msg, token).ConfigureAwait(false); + { + await SendHeadersAsync(msg, token).ConfigureAwait(false); await SendDataStreamAsync(contentLength, stream, token).ConfigureAwait(false); _Statistics.IncrementSentMessages(); @@ -1042,7 +1042,7 @@ private async Task SendInternalAsync(WatsonMessage msg, long contentLength } finally { - _WriteLock.Release(); + _WriteLock.Release(); if (disconnectDetected) { @@ -1051,23 +1051,23 @@ private async Task SendInternalAsync(WatsonMessage msg, long contentLength } } } - + private async Task SendAndWaitInternalAsync(WatsonMessage msg, int timeoutMs, long contentLength, Stream stream, CancellationToken token) { - if (msg == null) throw new ArgumentNullException(nameof(msg)); + if (msg == null) throw new ArgumentNullException(nameof(msg)); if (!Connected) throw new InvalidOperationException("Client is not connected to the server."); if (contentLength > 0 && (stream == null || !stream.CanRead)) throw new ArgumentException("Cannot read from supplied stream."); bool disconnectDetected = false; - + if (_Client == null || !_Client.Connected) { disconnectDetected = true; throw new InvalidOperationException("Client is not connected to the server."); } - + await _WriteLock.WaitAsync(token).ConfigureAwait(false); SyncResponse ret = null; @@ -1083,7 +1083,7 @@ private async Task SendAndWaitInternalAsync(WatsonMessage msg, int } }; - // Subscribe + // Subscribe _SyncResponseReceived += handler; try @@ -1124,7 +1124,7 @@ private async Task SendAndWaitInternalAsync(WatsonMessage msg, int // Wait for responded.Set() to be called responded.WaitOne(new TimeSpan(0,0,0,0, timeoutMs)); - // Unsubscribe + // Unsubscribe _SyncResponseReceived -= handler; if (ret != null) @@ -1141,11 +1141,11 @@ private async Task SendAndWaitInternalAsync(WatsonMessage msg, int private async Task SendHeadersAsync(WatsonMessage msg, CancellationToken token) { msg.SenderGuid = _Settings.Guid; - byte[] headerBytes = _MessageBuilder.GetHeaderBytes(msg); + byte[] headerBytes = _MessageBuilder.GetHeaderBytes(msg); await _DataStream.WriteAsync(headerBytes, 0, headerBytes.Length, token).ConfigureAwait(false); await _DataStream.FlushAsync(token).ConfigureAwait(false); } - + private async Task SendDataStreamAsync(long contentLength, Stream stream, CancellationToken token) { if (contentLength <= 0) return; @@ -1170,7 +1170,7 @@ private async Task SendDataStreamAsync(long contentLength, Stream stream, Cancel await _DataStream.WriteAsync(_SendBuffer, 0, bytesRead, token).ConfigureAwait(false); bytesRemaining -= bytesRead; } - } + } await _DataStream.FlushAsync(token).ConfigureAwait(false); } diff --git a/src/WatsonTcp/WatsonTcpServer.cs b/src/WatsonTcp/WatsonTcpServer.cs index e17992d..4ac60c0 100644 --- a/src/WatsonTcp/WatsonTcpServer.cs +++ b/src/WatsonTcp/WatsonTcpServer.cs @@ -22,7 +22,7 @@ public class WatsonTcpServer : IDisposable { #region Public-Members - + /// /// Watson TCP server settings. /// @@ -38,7 +38,7 @@ public WatsonTcpServerSettings Settings else _Settings = value; } } - + /// /// Watson TCP server events. /// @@ -195,7 +195,7 @@ public bool IsListening /// /// Initialize the Watson TCP server without SSL. /// Supply a specific IP address on which to listen. Otherwise, use 'null' for the IP address to listen on any IP address. - /// If you do not specify an IP address, you may have to run WatsonTcp with administrative privileges. + /// If you do not specify an IP address, you may have to run WatsonTcp with administrative privileges. /// Call Start() afterward to start the server. /// /// The IP address on which the server should listen. If null, listen on any IP address (may require administrative privileges). @@ -206,7 +206,7 @@ public WatsonTcpServer( { if (listenerPort < 1) throw new ArgumentOutOfRangeException(nameof(listenerPort)); - _Mode = Mode.Tcp; + _Mode = Mode.Tcp; // According to the https://github.com/dotnet/WatsonTcp?tab=readme-ov-file#local-vs-external-connections if (string.IsNullOrEmpty(listenerIp) || listenerIp.Equals("*") || listenerIp.Equals("+") || listenerIp.Equals("0.0.0.0")) @@ -231,9 +231,9 @@ public WatsonTcpServer( } /// - /// Initialize the Watson TCP server with SSL. + /// Initialize the Watson TCP server with SSL. /// Supply a specific IP address on which to listen. Otherwise, use 'null' for the IP address to listen on any IP address. - /// If you do not specify an IP address, you may have to run WatsonTcp with administrative privileges. + /// If you do not specify an IP address, you may have to run WatsonTcp with administrative privileges. /// Call Start() afterward to start the server. /// /// The IP address on which the server should listen. If null, listen on any IP address (may require administrative privileges). @@ -250,7 +250,7 @@ public WatsonTcpServer( { if (listenerPort < 1) throw new ArgumentOutOfRangeException(nameof(listenerPort)); if (String.IsNullOrEmpty(pfxCertFile)) throw new ArgumentNullException(nameof(pfxCertFile)); - + _Mode = Mode.Ssl; _TlsVersion = tlsVersion; @@ -270,7 +270,7 @@ public WatsonTcpServer( _ListenerIp = listenerIp; } - _SslCertificate = null; + _SslCertificate = null; if (String.IsNullOrEmpty(pfxCertPass)) { _SslCertificate = new X509Certificate2(pfxCertFile); @@ -286,9 +286,9 @@ public WatsonTcpServer( } /// - /// Initialize the Watson TCP server with SSL. + /// Initialize the Watson TCP server with SSL. /// Supply a specific IP address on which to listen. Otherwise, use 'null' for the IP address to listen on any IP address. - /// If you do not specify an IP address, you may have to run WatsonTcp with administrative privileges. + /// If you do not specify an IP address, you may have to run WatsonTcp with administrative privileges. /// Call Start() afterward to start the server. /// /// The IP address on which the server should listen. If null, listen on any IP address (may require administrative privileges). @@ -378,7 +378,7 @@ public void Start() _MonitorClients = Task.Run(() => MonitorForIdleClients(_Token), _Token); _Events.HandleServerStarted(this, EventArgs.Empty); } - + /// /// Stop accepting connections. /// @@ -602,7 +602,7 @@ public async Task DisconnectClientsAsync(MessageStatus status = MessageStatus.Re /// /// Indicate if resources should be disposed. protected virtual void Dispose(bool disposing) - { + { if (disposing) { _Settings.Logger?.Invoke(Severity.Info, _Header + "disposing"); @@ -630,7 +630,7 @@ protected virtual void Dispose(bool disposing) _ClientManager.Dispose(); } - _Settings = null; + Settings = null; _Events = null; _Callbacks = null; _Statistics = null; @@ -648,8 +648,8 @@ protected virtual void Dispose(bool disposing) _AcceptConnections = null; _MonitorClients = null; - _IsListening = false; - } + _IsListening = false; + } } #region Connection @@ -678,10 +678,10 @@ private void EnableKeepalives(TcpClient client) // .NET Framework expects values in milliseconds - byte[] keepAlive = new byte[12]; - Buffer.BlockCopy(BitConverter.GetBytes((uint)1), 0, keepAlive, 0, 4); - Buffer.BlockCopy(BitConverter.GetBytes((uint)(_Keepalive.TcpKeepAliveTime * 1000)), 0, keepAlive, 4, 4); - Buffer.BlockCopy(BitConverter.GetBytes((uint)(_Keepalive.TcpKeepAliveInterval * 1000)), 0, keepAlive, 8, 4); + byte[] keepAlive = new byte[12]; + Buffer.BlockCopy(BitConverter.GetBytes((uint)1), 0, keepAlive, 0, 4); + Buffer.BlockCopy(BitConverter.GetBytes((uint)(_Keepalive.TcpKeepAliveTime * 1000)), 0, keepAlive, 4, 4); + Buffer.BlockCopy(BitConverter.GetBytes((uint)(_Keepalive.TcpKeepAliveInterval * 1000)), 0, keepAlive, 8, 4); client.Client.IOControl(IOControlCode.KeepAliveValues, keepAlive, null); #elif NETSTANDARD @@ -696,11 +696,11 @@ private void EnableKeepalives(TcpClient client) } private async Task AcceptConnections(CancellationToken token) - { + { _IsListening = true; while (true) - { + { try { token.ThrowIfCancellationRequested(); @@ -824,7 +824,7 @@ private async Task AcceptConnections(CancellationToken token) _Events.HandleExceptionEncountered(this, new ExceptionEventArgs(e)); break; } - } + } } private async Task StartTls(ClientMetadata client, CancellationToken token) @@ -854,7 +854,7 @@ private async Task StartTls(ClientMetadata client, CancellationToken token if (_Settings.MutuallyAuthenticate && !client.SslStream.IsMutuallyAuthenticated) { _Settings.Logger?.Invoke(Severity.Error, _Header + $"mutual authentication with {client.ToString()} ({_TlsVersion}) failed"); - client.Dispose(); + client.Dispose(); Interlocked.Decrement(ref _Connections); return false; } @@ -864,16 +864,16 @@ private async Task StartTls(ClientMetadata client, CancellationToken token _Settings.Logger?.Invoke(Severity.Error, _Header + $"disconnected during SSL/TLS establishment with {client.ToString()} ({_TlsVersion}): " + e.Message); _Events.HandleExceptionEncountered(this, new ExceptionEventArgs(e)); - client.Dispose(); + client.Dispose(); Interlocked.Decrement(ref _Connections); return false; - } + } return true; } private async Task FinalizeConnection(ClientMetadata client, CancellationToken token) - { + { #region Request-Authentication if (!String.IsNullOrEmpty(_Settings.PresharedKey)) @@ -894,7 +894,7 @@ private async Task FinalizeConnection(ClientMetadata client, CancellationToken t _Settings.Logger?.Invoke(Severity.Debug, _Header + "starting data receiver for " + client.ToString()); client.DataReceiver = Task.Run(() => DataReceiver(client, token), token); - #endregion + #endregion } private bool IsClientConnected(ClientMetadata client) @@ -1001,7 +1001,7 @@ private async Task DataReceiver(ClientMetadata client, CancellationToken token) await Task.Delay(30, token).ConfigureAwait(false); continue; } - + if (!String.IsNullOrEmpty(_Settings.PresharedKey)) { if (_ClientManager.ExistsUnauthenticatedClient(client.Guid)) @@ -1068,7 +1068,7 @@ private async Task DataReceiver(ClientMetadata client, CancellationToken token) _Events.HandleClientConnected(this, new ConnectionEventArgs(client)); continue; } - + if (msg.SyncRequest) { _Settings.Logger?.Invoke(Severity.Debug, _Header + client.ToString() + " synchronous request received: " + msg.ConversationGuid.ToString()); @@ -1117,14 +1117,14 @@ private async Task DataReceiver(ClientMetadata client, CancellationToken token) }, token); } else - { + { _Settings.Logger?.Invoke(Severity.Debug, _Header + "expired synchronous request received and discarded from " + client.ToString()); - } + } } else if (msg.SyncResponse) { // No need to amend message expiration; it is copied from the request, which was set by this node - // DateTime expiration = WatsonCommon.GetExpirationTimestamp(msg); + // DateTime expiration = WatsonCommon.GetExpirationTimestamp(msg); _Settings.Logger?.Invoke(Severity.Debug, _Header + client.ToString() + " synchronous response received: " + msg.ConversationGuid.ToString()); byte[] msgData = await WatsonCommon.ReadMessageDataAsync(msg, _Settings.StreamBufferSize, token).ConfigureAwait(false); @@ -1142,11 +1142,11 @@ private async Task DataReceiver(ClientMetadata client, CancellationToken token) } else { - byte[] msgData = null; + byte[] msgData = null; if (_Events.IsUsingMessages) - { - msgData = await WatsonCommon.ReadMessageDataAsync(msg, _Settings.StreamBufferSize, token).ConfigureAwait(false); + { + msgData = await WatsonCommon.ReadMessageDataAsync(msg, _Settings.StreamBufferSize, token).ConfigureAwait(false); MessageReceivedEventArgs mr = new MessageReceivedEventArgs(client, msg.Metadata, msgData); await Task.Run(() => _Events.HandleMessageReceived(this, mr), token); } @@ -1159,15 +1159,15 @@ private async Task DataReceiver(ClientMetadata client, CancellationToken token) { ws = new WatsonStream(msg.ContentLength, msg.DataStream); sr = new StreamReceivedEventArgs(client, msg.Metadata, msg.ContentLength, ws); - _Events.HandleStreamReceived(this, sr); + _Events.HandleStreamReceived(this, sr); } else { MemoryStream ms = await WatsonCommon.DataStreamToMemoryStream(msg.ContentLength, msg.DataStream, _Settings.StreamBufferSize, token).ConfigureAwait(false); - ws = new WatsonStream(msg.ContentLength, ms); + ws = new WatsonStream(msg.ContentLength, ms); sr = new StreamReceivedEventArgs(client, msg.Metadata, msg.ContentLength, ws); await Task.Run(() => _Events.HandleStreamReceived(this, sr), token); - } + } } else { @@ -1310,7 +1310,7 @@ private async Task SendAndWaitInternalAsync(ClientMetadata client, } }; - // Subscribe + // Subscribe _SyncResponseReceived += handler; try @@ -1337,7 +1337,7 @@ private async Task SendAndWaitInternalAsync(ClientMetadata client, // Wait for responded.Set() to be called responded.WaitOne(new TimeSpan(0, 0, 0, 0, timeoutMs)); - // Unsubscribe + // Unsubscribe _SyncResponseReceived -= handler; if (ret != null) @@ -1352,12 +1352,12 @@ private async Task SendAndWaitInternalAsync(ClientMetadata client, } private async Task SendHeadersAsync(ClientMetadata client, WatsonMessage msg, CancellationToken token) - { + { byte[] headerBytes = _MessageBuilder.GetHeaderBytes(msg); await client.DataStream.WriteAsync(headerBytes, 0, headerBytes.Length, token).ConfigureAwait(false); await client.DataStream.FlushAsync(token).ConfigureAwait(false); } - + private async Task SendDataStreamAsync(ClientMetadata client, long contentLength, Stream stream, CancellationToken token) { if (contentLength <= 0) return; @@ -1382,7 +1382,7 @@ private async Task SendDataStreamAsync(ClientMetadata client, long contentLength await client.DataStream.WriteAsync(client.SendBuffer, 0, bytesRead, token).ConfigureAwait(false); bytesRemaining -= bytesRead; } - } + } await client.DataStream.FlushAsync(token).ConfigureAwait(false); } @@ -1421,7 +1421,7 @@ private async Task MonitorForIdleClients(CancellationToken token) } } } - } + } } } catch (TaskCanceledException) @@ -1433,7 +1433,7 @@ private async Task MonitorForIdleClients(CancellationToken token) } } - + #endregion #endregion