Skip to content

Commit 5d4138f

Browse files
authored
Merge pull request #5 from gencebay/master
Multiple SendAsync calls for binary message transport blocking bug fix
2 parents 663b3bb + dd15a55 commit 5d4138f

File tree

10 files changed

+87
-63
lines changed

10 files changed

+87
-63
lines changed

src/NetCoreStack.WebSockets.ProxyClient/ClientWebSocketConnector.cs

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,10 @@ private async Task Receive()
3535
_connectionId = context.Value.ToString();
3636

3737
var _invocators = _invocatorRegistry.GetInvocators(context);
38-
_invocators.ForEach(async x => await x.InvokeAsync(context));
38+
foreach (var invoker in _invocators)
39+
{
40+
await invoker.InvokeAsync(context);
41+
}
3942
result = await _webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
4043
}
4144

@@ -63,7 +66,10 @@ private async Task Receive()
6366
}
6467
var context = result.ToBinaryContext(binaryResult);
6568
var _invocators = _invocatorRegistry.GetInvocators(context);
66-
_invocators.ForEach(async x => await x.InvokeAsync(context));
69+
foreach (var invoker in _invocators)
70+
{
71+
await invoker.InvokeAsync(context);
72+
}
6773
result = await _webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
6874
}
6975
}
@@ -72,10 +78,23 @@ private async Task Receive()
7278

7379
public async Task ConnectAsync()
7480
{
75-
var uri = new Uri($"ws://{_options.WebSocketHostAddress}");
76-
_webSocket = new ClientWebSocket();
77-
await _webSocket.ConnectAsync(uri, CancellationToken.None);
78-
await Task.WhenAll(Receive());
81+
try
82+
{
83+
var name = _options.ConnectorName;
84+
var uri = new Uri($"ws://{_options.WebSocketHostAddress}");
85+
_webSocket = new ClientWebSocket();
86+
await _webSocket.ConnectAsync(uri, CancellationToken.None);
87+
await Receive();
88+
}
89+
catch (Exception ex)
90+
{
91+
throw ex;
92+
}
93+
finally
94+
{
95+
if (_webSocket != null)
96+
_webSocket.Dispose();
97+
}
7998
}
8099

81100
public async Task SendAsync(WebSocketMessageContext context)

src/NetCoreStack.WebSockets.ProxyClient/ConnectorOptions.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,10 @@ public class ConnectorOptions
1414
public ConnectorOptions()
1515
{
1616
Invocators = new List<Type>();
17+
ConnectorName = "";
1718
}
1819

20+
public string ConnectorName { get; set; }
1921
public string WebSocketHostAddress { get; set; }
2022
public void RegisterInvocator<TInvocator>(WebSocketCommands commands)
2123
where TInvocator : IClientWebSocketCommandInvocator

src/NetCoreStack.WebSockets.ProxyClient/ServiceCollectionExtensions.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ public static void AddProxyWebSockets(this IServiceCollection services, Action<C
1919
{
2020
foreach (var invocator in connectorOptions.Invocators)
2121
{
22-
services.AddSingleton(invocator);
22+
services.AddTransient(invocator);
2323
}
2424
}
2525

src/NetCoreStack.WebSockets/Internal/ConnectionManager.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ await transport.WebSocket.SendAsync(descriptor.Segments,
5454
CancellationToken.None);
5555
}
5656

57-
private async Task SendBinaryAsync(WebSocketTransport transport, byte[] chunkedBytes, bool endOfMessage)
57+
private async Task SendBinaryAsync(WebSocketTransport transport, byte[] chunkedBytes, bool endOfMessage, CancellationToken token)
5858
{
5959
if (transport == null)
6060
{
@@ -66,7 +66,7 @@ private async Task SendBinaryAsync(WebSocketTransport transport, byte[] chunkedB
6666
await transport.WebSocket.SendAsync(segments,
6767
WebSocketMessageType.Binary,
6868
endOfMessage,
69-
CancellationToken.None);
69+
token);
7070
}
7171

7272
public async Task BroadcastAsync(WebSocketMessageContext context)
@@ -115,7 +115,7 @@ public async Task BroadcastBinaryAsync(byte[] bytes, JsonObject properties)
115115

116116
foreach (var connection in Connections)
117117
{
118-
await SendBinaryAsync(transport: connection.Value, chunkedBytes: chunkedBytes, endOfMessage: endOfMessage);
118+
await SendBinaryAsync(connection.Value, chunkedBytes, endOfMessage, CancellationToken.None);
119119
}
120120

121121
if (endOfMessage)

src/NetCoreStack.WebSockets/Internal/ConnectionManagerExtensions.cs

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
using NetCoreStack.WebSockets.Common;
2+
using System;
23
using System.Net.WebSockets;
4+
using System.Text;
5+
using System.Threading;
36
using System.Threading.Tasks;
47

58
namespace NetCoreStack.WebSockets.Internal
@@ -18,11 +21,36 @@ public static async Task Handshake(this IConnectionManager manager,
1821
context.State = await initState.GetStateAsync();
1922

2023
await manager.SendAsync(transport.ConnectionId, context, webSocket);
21-
await transport.Echo();
24+
await Receive(webSocket);
2225
if (webSocket.State == WebSocketState.Closed)
2326
{
2427
manager.CloseConnection(transport.ConnectionId);
2528
}
2629
}
30+
31+
private static async Task Receive(WebSocket webSocket)
32+
{
33+
var buffer = new byte[SocketsConstants.ChunkSize];
34+
var result = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
35+
while (!result.CloseStatus.HasValue)
36+
{
37+
string content = "<<binary>>";
38+
if (result.MessageType == WebSocketMessageType.Text)
39+
{
40+
content = Encoding.UTF8.GetString(buffer, 0, result.Count);
41+
if (content.Equals("ServerClose"))
42+
{
43+
await webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closing from Server", CancellationToken.None);
44+
}
45+
else if (content.Equals("ServerAbort"))
46+
{
47+
webSocket.Abort();
48+
}
49+
}
50+
51+
result = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
52+
}
53+
await webSocket.CloseAsync(result.CloseStatus.Value, result.CloseStatusDescription, CancellationToken.None);
54+
}
2755
}
2856
}
Lines changed: 0 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
11
using System;
22
using System.Net.WebSockets;
3-
using System.Text;
4-
using System.Threading;
5-
using System.Threading.Tasks;
63

74
namespace NetCoreStack.WebSockets.Internal
85
{
@@ -16,50 +13,5 @@ public WebSocketTransport(WebSocket webSocket)
1613
ConnectionId = Guid.NewGuid().ToString();
1714
WebSocket = webSocket;
1815
}
19-
20-
public async Task Echo()
21-
{
22-
var buffer = new byte[1024 * 4];
23-
var result = await WebSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
24-
while (!result.CloseStatus.HasValue)
25-
{
26-
string content = "<<binary>>";
27-
if (result.MessageType == WebSocketMessageType.Text)
28-
{
29-
content = Encoding.UTF8.GetString(buffer, 0, result.Count);
30-
if (content.Equals("ServerClose"))
31-
{
32-
await WebSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closing from Server", CancellationToken.None);
33-
}
34-
else if (content.Equals("ServerAbort"))
35-
{
36-
WebSocket.Abort();
37-
}
38-
}
39-
40-
result = await WebSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
41-
LogFrame(result, buffer);
42-
}
43-
await WebSocket.CloseAsync(result.CloseStatus.Value, result.CloseStatusDescription, CancellationToken.None);
44-
}
45-
46-
private void LogFrame(WebSocketReceiveResult frame, byte[] buffer)
47-
{
48-
var close = frame.CloseStatus != null;
49-
string message;
50-
if (close)
51-
{
52-
message = $"Close: {frame.CloseStatus.Value} {frame.CloseStatusDescription}";
53-
}
54-
else
55-
{
56-
string content = "<<binary>>";
57-
if (frame.MessageType == WebSocketMessageType.Text)
58-
{
59-
content = Encoding.UTF8.GetString(buffer, 0, frame.Count);
60-
}
61-
message = $"{frame.MessageType}: Len={frame.Count}, Fin={frame.EndOfMessage}: {content}";
62-
}
63-
}
6416
}
6517
}

test/ServerTestApp/Controllers/DiscoveryController.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ public async Task<IActionResult> SendAsync([FromBody]SimpleModel model)
4545
return Ok();
4646
}
4747

48-
[HttpPost(nameof(BrodcastBinaryAsync))]
49-
public async Task<IActionResult> BrodcastBinaryAsync([FromBody]SimpleModel model)
48+
[HttpPost(nameof(BroadcastBinaryAsync))]
49+
public async Task<IActionResult> BroadcastBinaryAsync([FromBody]SimpleModel model)
5050
{
5151
var bytes = _distrubutedCache.Get(model.Key);
5252
await _connectionManager.BroadcastBinaryAsync(bytes, new SocketObject { Key = model.Key });
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
using Microsoft.AspNetCore.Mvc.Filters;
2+
using System.Threading.Tasks;
3+
4+
namespace WebClientTestApp
5+
{
6+
public class ClientExceptionFilterAttribute : ExceptionFilterAttribute
7+
{
8+
public override Task OnExceptionAsync(ExceptionContext context)
9+
{
10+
return base.OnExceptionAsync(context);
11+
}
12+
}
13+
}

test/WebClientTestApp/CustomWebSocketCommandInvocator.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,11 @@ public CustomWebSocketCommandInvocator(IConnectionManager connectionManager)
1414
_connectionManager = connectionManager;
1515
}
1616

17+
private Task InternalMethodAsync()
18+
{
19+
return Task.CompletedTask;
20+
}
21+
1722
public async Task InvokeAsync(WebSocketMessageContext context)
1823
{
1924
if (context.MessageType == WebSocketMessageType.Binary)

test/WebClientTestApp/Startup.cs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using Microsoft.Extensions.Logging;
66
using NetCoreStack.WebSockets;
77
using NetCoreStack.WebSockets.ProxyClient;
8+
using System;
89
using System.IO;
910

1011
namespace WebClientTestApp
@@ -18,6 +19,7 @@ public Startup(IHostingEnvironment env)
1819
.AddJsonFile("appsettings.json", optional: true, reloadOnChange: true)
1920
.AddJsonFile($"appsettings.{env.EnvironmentName}.json", optional: true)
2021
.AddEnvironmentVariables();
22+
2123
Configuration = builder.Build();
2224
}
2325

@@ -28,15 +30,18 @@ public void ConfigureServices(IServiceCollection services)
2830
{
2931
// Client WebSocket - DMZ to API side connections
3032
services.AddProxyWebSockets(options => {
33+
options.ConnectorName = Environment.MachineName;
3134
options.WebSocketHostAddress = "localhost:7803";
3235
options.RegisterInvocator<CustomWebSocketCommandInvocator>(WebSocketCommands.All);
3336
});
3437

3538
// WebSockets for Browsers - User Agent ( javascript clients )
3639
services.AddNativeWebSockets();
3740

38-
// Add MVC framework services.
39-
services.AddMvc();
41+
// Add framework services.
42+
services.AddMvc(options => {
43+
options.Filters.Add(new ClientExceptionFilterAttribute());
44+
});
4045
}
4146

4247
// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.

0 commit comments

Comments
 (0)