Skip to content

Reset _Settings to defaults rather than to null #317

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 45 additions & 45 deletions src/WatsonTcp/WatsonTcpClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public ISerializationHelper SerializationHelper
private string _SourceIp = null;
private int _SourcePort = 0;
private string _ServerIp = null;
private int _ServerPort = 0;
private int _ServerPort = 0;

private TcpClient _Client = null;
private Stream _DataStream = null;
Expand Down Expand Up @@ -192,7 +192,7 @@ public WatsonTcpClient(
{
if (String.IsNullOrEmpty(serverIp)) throw new ArgumentNullException(nameof(serverIp));
if (serverPort < 0) throw new ArgumentOutOfRangeException(nameof(serverPort));

_Mode = Mode.Tcp;
_ServerIp = serverIp;
_ServerPort = serverPort;
Expand All @@ -218,7 +218,7 @@ public WatsonTcpClient(
{
if (String.IsNullOrEmpty(serverIp)) throw new ArgumentNullException(nameof(serverIp));
if (serverPort < 0) throw new ArgumentOutOfRangeException(nameof(serverPort));

_Mode = Mode.Ssl;
_TlsVersion = tlsVersion;
_ServerIp = serverIp;
Expand Down Expand Up @@ -257,15 +257,15 @@ public WatsonTcpClient(
/// <param name="cert">The SSL certificate</param>
/// <param name="tlsVersion">The TLS version used for this conenction.</param>
public WatsonTcpClient(
string serverIp,
int serverPort,
string serverIp,
int serverPort,
X509Certificate2 cert,
TlsVersion tlsVersion = TlsVersion.Tls12)
{
if (String.IsNullOrEmpty(serverIp)) throw new ArgumentNullException(nameof(serverIp));
if (serverPort < 0) throw new ArgumentOutOfRangeException(nameof(serverPort));
if (cert == null) throw new ArgumentNullException(nameof(cert));

_Mode = Mode.Ssl;
_TlsVersion = tlsVersion;
_SslCertificate = cert;
Expand All @@ -285,13 +285,13 @@ public WatsonTcpClient(

#region Public-Methods

/// <summary>
/// <summary>
/// Disconnect the client and dispose of background workers.
/// Do not reuse the object after disposal.
/// </summary>
public void Dispose()
{
Dispose(true);
Dispose(true);
GC.SuppressFinalize(this);
}

Expand Down Expand Up @@ -495,7 +495,7 @@ public void Disconnect(bool sendNotice = true)
_Token = default(CancellationToken);
}
}

if (_SslStream != null)
{
_SslStream.Close();
Expand Down Expand Up @@ -573,7 +573,7 @@ public async Task<bool> SendAsync(byte[] data, Dictionary<string, object> metada
WatsonCommon.BytesToStream(data, start, out int contentLength, out Stream stream);
return await SendAsync(contentLength, stream, metadata, token).ConfigureAwait(false);
}

/// <summary>
/// Send data and metadata to the server from a stream asynchronously.
/// </summary>
Expand Down Expand Up @@ -675,7 +675,7 @@ protected virtual void Dispose(bool disposing)
if (_ReadLock != null)
_ReadLock.Dispose();

_Settings = null;
Settings = null;
_Events = null;
_Callbacks = null;
_Statistics = null;
Expand All @@ -696,7 +696,7 @@ protected virtual void Dispose(bool disposing)
_ReadLock = null;

_DataReceiver = null;
}
}
}

#region Connection
Expand Down Expand Up @@ -725,10 +725,10 @@ private void EnableKeepalives()

// .NET Framework expects values in milliseconds

byte[] keepAlive = new byte[12];
Buffer.BlockCopy(BitConverter.GetBytes((uint)1), 0, keepAlive, 0, 4);
Buffer.BlockCopy(BitConverter.GetBytes((uint)(_Keepalive.TcpKeepAliveTime * 1000)), 0, keepAlive, 4, 4);
Buffer.BlockCopy(BitConverter.GetBytes((uint)(_Keepalive.TcpKeepAliveInterval * 1000)), 0, keepAlive, 8, 4);
byte[] keepAlive = new byte[12];
Buffer.BlockCopy(BitConverter.GetBytes((uint)1), 0, keepAlive, 0, 4);
Buffer.BlockCopy(BitConverter.GetBytes((uint)(_Keepalive.TcpKeepAliveTime * 1000)), 0, keepAlive, 4, 4);
Buffer.BlockCopy(BitConverter.GetBytes((uint)(_Keepalive.TcpKeepAliveInterval * 1000)), 0, keepAlive, 8, 4);
_Client.Client.IOControl(IOControlCode.KeepAliveValues, keepAlive, null);

#elif NETSTANDARD
Expand All @@ -755,7 +755,7 @@ private async Task DataReceiver(CancellationToken token)
try
{
token.ThrowIfCancellationRequested();

#region Check-for-Connection

if (_Client == null || !_Client.Connected)
Expand All @@ -771,7 +771,7 @@ private async Task DataReceiver(CancellationToken token)
await _ReadLock.WaitAsync(token);
WatsonMessage msg = await _MessageBuilder.BuildFromStream(_DataStream, token);
if (msg == null)
{
{
await Task.Delay(30, token).ConfigureAwait(false);
continue;
}
Expand Down Expand Up @@ -815,7 +815,7 @@ private async Task DataReceiver(CancellationToken token)
}
else if (msg.Status == MessageStatus.AuthRequired)
{
_Settings.Logger?.Invoke(Severity.Info, _Header + "authentication required by server; please authenticate using pre-shared key");
_Settings.Logger?.Invoke(Severity.Info, _Header + "authentication required by server; please authenticate using pre-shared key");
string psk = _Callbacks.HandleAuthenticationRequested();
if (!String.IsNullOrEmpty(psk)) await AuthenticateAsync(psk, token);
continue;
Expand All @@ -828,7 +828,7 @@ private async Task DataReceiver(CancellationToken token)
if (msg.SyncRequest)
{
_Settings.Logger?.Invoke(Severity.Debug, _Header + "synchronous request received: " + msg.ConversationGuid.ToString());

DateTime expiration = WatsonCommon.GetExpirationTimestamp(msg);
byte[] msgData = await WatsonCommon.ReadMessageDataAsync(msg, _Settings.StreamBufferSize, token).ConfigureAwait(false);

Expand Down Expand Up @@ -871,17 +871,17 @@ private async Task DataReceiver(CancellationToken token)
respMsg.ConversationGuid = msg.ConversationGuid;
await SendInternalAsync(respMsg, contentLength, stream, token).ConfigureAwait(false);
}
}, _Token);
}, _Token);
}
else
{
{
_Settings.Logger?.Invoke(Severity.Debug, _Header + "expired synchronous request received and discarded");
}
}
}
else if (msg.SyncResponse)
{
// No need to amend message expiration; it is copied from the request, which was set by this node
// DateTime expiration = WatsonCommon.GetExpirationTimestamp(msg);
// DateTime expiration = WatsonCommon.GetExpirationTimestamp(msg);
_Settings.Logger?.Invoke(Severity.Debug, _Header + "synchronous response received: " + msg.ConversationGuid.ToString());
byte[] msgData = await WatsonCommon.ReadMessageDataAsync(msg, _Settings.StreamBufferSize, token).ConfigureAwait(false);

Expand All @@ -902,8 +902,8 @@ private async Task DataReceiver(CancellationToken token)
byte[] msgData = null;

if (_Events.IsUsingMessages)
{
msgData = await WatsonCommon.ReadMessageDataAsync(msg, _Settings.StreamBufferSize, token).ConfigureAwait(false);
{
msgData = await WatsonCommon.ReadMessageDataAsync(msg, _Settings.StreamBufferSize, token).ConfigureAwait(false);
MessageReceivedEventArgs args = new MessageReceivedEventArgs(null, msg.Metadata, msgData);
await Task.Run(() => _Events.HandleMessageReceived(this, args));
}
Expand All @@ -915,16 +915,16 @@ private async Task DataReceiver(CancellationToken token)
if (msg.ContentLength >= _Settings.MaxProxiedStreamSize)
{
ws = new WatsonStream(msg.ContentLength, msg.DataStream);
sr = new StreamReceivedEventArgs(null, msg.Metadata, msg.ContentLength, ws);
sr = new StreamReceivedEventArgs(null, msg.Metadata, msg.ContentLength, ws);
_Events.HandleStreamReceived(this, sr);
}
else
{
MemoryStream ms = await WatsonCommon.DataStreamToMemoryStream(msg.ContentLength, msg.DataStream, _Settings.StreamBufferSize, token).ConfigureAwait(false);
ws = new WatsonStream(msg.ContentLength, ms);
sr = new StreamReceivedEventArgs(null, msg.Metadata, msg.ContentLength, ws);
sr = new StreamReceivedEventArgs(null, msg.Metadata, msg.ContentLength, ws);
Task unawaited = Task.Run(() => _Events.HandleStreamReceived(this, sr), token);
}
}
}
else
{
Expand Down Expand Up @@ -968,7 +968,7 @@ private async Task DataReceiver(CancellationToken token)
_Header + "data receiver exception for " + _ServerIp + ":" + _ServerPort + ": " + e.Message + Environment.NewLine);
_Events?.HandleExceptionEncountered(this, new ExceptionEventArgs(e));
break;
}
}
finally
{
if (_ReadLock != null) _ReadLock.Release();
Expand All @@ -991,7 +991,7 @@ private async Task<bool> SendInternalAsync(WatsonMessage msg, long contentLength
{
if (msg == null) throw new ArgumentNullException(nameof(msg));
if (!Connected) return false;

if (contentLength > 0 && (stream == null || !stream.CanRead))
{
throw new ArgumentException("Cannot read from supplied stream.");
Expand All @@ -1004,17 +1004,17 @@ private async Task<bool> SendInternalAsync(WatsonMessage msg, long contentLength
}

bool disconnectDetected = false;

if (_Client == null || !_Client.Connected)
{
return false;
}

await _WriteLock.WaitAsync(token).ConfigureAwait(false);

try
{
await SendHeadersAsync(msg, token).ConfigureAwait(false);
{
await SendHeadersAsync(msg, token).ConfigureAwait(false);
await SendDataStreamAsync(contentLength, stream, token).ConfigureAwait(false);

_Statistics.IncrementSentMessages();
Expand Down Expand Up @@ -1042,7 +1042,7 @@ private async Task<bool> SendInternalAsync(WatsonMessage msg, long contentLength
}
finally
{
_WriteLock.Release();
_WriteLock.Release();

if (disconnectDetected)
{
Expand All @@ -1051,23 +1051,23 @@ private async Task<bool> SendInternalAsync(WatsonMessage msg, long contentLength
}
}
}

private async Task<SyncResponse> SendAndWaitInternalAsync(WatsonMessage msg, int timeoutMs, long contentLength, Stream stream, CancellationToken token)
{
if (msg == null) throw new ArgumentNullException(nameof(msg));
if (msg == null) throw new ArgumentNullException(nameof(msg));
if (!Connected) throw new InvalidOperationException("Client is not connected to the server.");

if (contentLength > 0 && (stream == null || !stream.CanRead))
throw new ArgumentException("Cannot read from supplied stream.");

bool disconnectDetected = false;

if (_Client == null || !_Client.Connected)
{
disconnectDetected = true;
throw new InvalidOperationException("Client is not connected to the server.");
}

await _WriteLock.WaitAsync(token).ConfigureAwait(false);

SyncResponse ret = null;
Expand All @@ -1083,7 +1083,7 @@ private async Task<SyncResponse> SendAndWaitInternalAsync(WatsonMessage msg, int
}
};

// Subscribe
// Subscribe
_SyncResponseReceived += handler;

try
Expand Down Expand Up @@ -1124,7 +1124,7 @@ private async Task<SyncResponse> SendAndWaitInternalAsync(WatsonMessage msg, int
// Wait for responded.Set() to be called
responded.WaitOne(new TimeSpan(0,0,0,0, timeoutMs));

// Unsubscribe
// Unsubscribe
_SyncResponseReceived -= handler;

if (ret != null)
Expand All @@ -1141,11 +1141,11 @@ private async Task<SyncResponse> SendAndWaitInternalAsync(WatsonMessage msg, int
private async Task SendHeadersAsync(WatsonMessage msg, CancellationToken token)
{
msg.SenderGuid = _Settings.Guid;
byte[] headerBytes = _MessageBuilder.GetHeaderBytes(msg);
byte[] headerBytes = _MessageBuilder.GetHeaderBytes(msg);
await _DataStream.WriteAsync(headerBytes, 0, headerBytes.Length, token).ConfigureAwait(false);
await _DataStream.FlushAsync(token).ConfigureAwait(false);
}

private async Task SendDataStreamAsync(long contentLength, Stream stream, CancellationToken token)
{
if (contentLength <= 0) return;
Expand All @@ -1170,7 +1170,7 @@ private async Task SendDataStreamAsync(long contentLength, Stream stream, Cancel
await _DataStream.WriteAsync(_SendBuffer, 0, bytesRead, token).ConfigureAwait(false);
bytesRemaining -= bytesRead;
}
}
}

await _DataStream.FlushAsync(token).ConfigureAwait(false);
}
Expand Down
Loading