Skip to content

Commit 8ad2c2c

Browse files
Fix and Improve Streaming Rpc (#47)
* 🐛 Fix streaming rpc * ⚡ Add Reconnect
1 parent d690879 commit 8ad2c2c

9 files changed

+177
-176
lines changed

SharedBuildProperties.props

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
33
<PropertyGroup>
44
<Product>Solana.Unity</Product>
5-
<Version>2.6.1.2</Version>
5+
<Version>2.6.1.3</Version>
66
<Copyright>Copyright 2022 &#169; Magicblock Labs</Copyright>
77
<Authors>Magicblock Labs</Authors>
88
<PublisherName>Magicblock Labs</PublisherName>

src/Solana.Unity.Rpc/Core/Sockets/IWebSocket.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@ internal interface IWebSocket : IDisposable
1414
Task CloseAsync(CancellationToken cancellationToken);
1515
Task SendAsync(ReadOnlyMemory<byte> buffer, WebSocketMessageType messageType, bool endOfMessage, CancellationToken cancellationToken);
1616
Task CloseAsync(WebSocketCloseStatus closeStatus, string statusDescription, CancellationToken cancellationToken);
17-
Task<WebSocketReceiveResult> ReceiveAsync(Memory<byte> buffer, CancellationToken cancellationToken);
17+
18+
public abstract event WebSocketMessageEventHandler OnMessage;
19+
public delegate void WebSocketMessageEventHandler(byte[] data);
20+
public event EventHandler<WebSocketState> ConnectionStateChangedEvent;
1821
}
1922
}

src/Solana.Unity.Rpc/Core/Sockets/StreamingRpcClient.cs

Lines changed: 13 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ protected StreamingRpcClient(string url, object logger, IWebSocket socket = defa
5656
_logger = logger;
5757
_sem = new SemaphoreSlim(1, 1);
5858
_connectionStats = new ConnectionStats();
59+
ClientSocket.ConnectionStateChangedEvent += (sender, state) => ConnectionStateChangedEvent?.Invoke(sender, state);
5960
}
6061

6162
/// <summary>
@@ -70,7 +71,7 @@ public async Task ConnectAsync()
7071
if (ClientSocket.State != WebSocketState.Open)
7172
{
7273
await ClientSocket.ConnectAsync(NodeAddress, CancellationToken.None);
73-
_ = StartListening();
74+
ClientSocket.OnMessage += DispatchMessage;
7475
ConnectionStateChangedEvent?.Invoke(this, State);
7576
}
7677
}
@@ -80,6 +81,16 @@ public async Task ConnectAsync()
8081
}
8182
}
8283

84+
private void DispatchMessage(byte[] message)
85+
{
86+
HandleNewMessage(new Memory<byte>(message));
87+
_connectionStats.AddReceived((uint)message.Length);
88+
if (ClientSocket.State != WebSocketState.Open && ClientSocket.State != WebSocketState.Connecting)
89+
{
90+
ConnectionStateChangedEvent?.Invoke(this, State);
91+
}
92+
}
93+
8394
/// <inheritdoc cref="IStreamingRpcClient.DisconnectAsync"/>
8495
public async Task DisconnectAsync()
8596
{
@@ -94,6 +105,7 @@ public async Task DisconnectAsync()
94105
//and will also notify when there is a non-user triggered disconnection event
95106

96107
// handle disconnection cleanup
108+
ClientSocket.OnMessage -= DispatchMessage;
97109
ClientSocket.Dispose();
98110
ClientSocket = new WebSocketWrapper();
99111
CleanupSubscriptions();
@@ -105,78 +117,6 @@ public async Task DisconnectAsync()
105117
}
106118
}
107119

108-
/// <summary>
109-
/// Starts listeing to new messages.
110-
/// </summary>
111-
/// <returns>Returns the task representing the asynchronous task.</returns>
112-
private async Task StartListening()
113-
{
114-
while (ClientSocket.State is WebSocketState.Open or WebSocketState.Connecting)
115-
{
116-
try
117-
{
118-
await ReadNextMessage();
119-
}
120-
catch (Exception e)
121-
{
122-
if (_logger != null)
123-
{
124-
Console.WriteLine($"Exception trying to read next message: {e.Message}");
125-
}
126-
}
127-
}
128-
129-
if (_logger != null)
130-
{
131-
Console.WriteLine($"Stopped reading messages. ClientSocket.State changed to {ClientSocket.State}");
132-
}
133-
ConnectionStateChangedEvent?.Invoke(this, State);
134-
}
135-
136-
/// <summary>
137-
/// Reads the next message from the socket.
138-
/// </summary>
139-
/// <param name="cancellationToken">The cancelation token.</param>
140-
/// <returns>Returns the task representing the asynchronous task.</returns>
141-
private async Task ReadNextMessage(CancellationToken cancellationToken = default)
142-
{
143-
var buffer = new byte[32768];
144-
Memory<byte> mem = new(buffer);
145-
WebSocketReceiveResult result = await ClientSocket.ReceiveAsync(mem, cancellationToken);
146-
int count = result.Count;
147-
148-
if (result.MessageType == WebSocketMessageType.Close)
149-
{
150-
await ClientSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, cancellationToken);
151-
}
152-
else
153-
{
154-
if (!result.EndOfMessage)
155-
{
156-
MemoryStream ms = new MemoryStream();
157-
ms.Write(mem.Span.ToArray(), 0, mem.Span.Length);
158-
159-
160-
while (!result.EndOfMessage)
161-
{
162-
result = await ClientSocket.ReceiveAsync(mem, cancellationToken).ConfigureAwait(false);
163-
164-
var memSlice = mem.Slice(0, result.Count).Span.ToArray();
165-
ms.Write(memSlice, 0, memSlice.Length);
166-
count += result.Count;
167-
}
168-
169-
mem = new Memory<byte>(ms.ToArray());
170-
}
171-
else
172-
{
173-
mem = mem.Slice(0, count);
174-
}
175-
_connectionStats.AddReceived((uint)count);
176-
HandleNewMessage(mem);
177-
}
178-
}
179-
180120
/// <summary>
181121
/// Handless a new message payload.
182122
/// </summary>

src/Solana.Unity.Rpc/Core/Sockets/SubscriptionState.cs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
using System;
1+
using Solana.Unity.Rpc.Messages;
2+
using System;
23
using System.Collections.Generic;
34
using System.Linq;
45
using System.Threading.Tasks;
@@ -29,6 +30,11 @@ public abstract class SubscriptionState
2930
/// The current state of the subscription.
3031
/// </summary>
3132
public SubscriptionStatus State { get; protected set; }
33+
34+
/// <summary>
35+
/// The JsonRpcRequest for this subscription.
36+
/// </summary>
37+
internal JsonRpcRequest Request;
3238

3339
/// <summary>
3440
/// The last error message.
@@ -95,6 +101,15 @@ internal void ChangeState(SubscriptionStatus newState, string error = null, stri
95101

96102
/// <inheritdoc cref="Unsubscribe"/>
97103
public async Task UnsubscribeAsync() => await _rpcClient.UnsubscribeAsync(this).ConfigureAwait(false);
104+
105+
/// <summary>
106+
/// Set the request for this subscription.
107+
/// </summary>
108+
/// <param name="request"></param>
109+
public void SetRequest(JsonRpcRequest request)
110+
{
111+
Request = request;
112+
}
98113
}
99114

100115
/// <summary>

src/Solana.Unity.Rpc/Core/Sockets/WebSocketWrapper.cs

Lines changed: 42 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
using NativeWebSocket;
2-
using System;
1+
using System;
32
using System.Net.WebSockets;
43
using System.Threading;
54
using System.Threading.Tasks;
@@ -13,8 +12,10 @@ internal class WebSocketWrapper : IWebSocket
1312
private NativeWebSocket.IWebSocket webSocket;
1413

1514
public WebSocketCloseStatus? CloseStatus => WebSocketCloseStatus.NormalClosure;
16-
15+
1716
public string CloseStatusDescription => "Not implemented";
17+
18+
private TaskCompletionSource<bool> _webSocketConnectionTask = new();
1819

1920
public WebSocketState State
2021
{
@@ -33,37 +34,57 @@ public WebSocketState State
3334
}
3435
}
3536

36-
public Task CloseAsync(WebSocketCloseStatus closeStatus, string statusDescription, CancellationToken cancellationToken)
37-
=> webSocket.Close();
37+
public Task CloseAsync(WebSocketCloseStatus closeStatus, string statusDescription,
38+
CancellationToken cancellationToken)
39+
{
40+
return webSocket.Close();
41+
}
3842

3943
public Task ConnectAsync(Uri uri, CancellationToken cancellationToken)
4044
{
4145
webSocket = WebSocket.Create(uri.AbsoluteUri);
46+
webSocket.OnOpen += () =>
47+
{
48+
_webSocketConnectionTask.TrySetResult(true);
49+
webSocket.OnMessage += MessageReceived;
50+
ConnectionStateChangedEvent?.Invoke(this, State);
51+
};
52+
webSocket.OnClose += _ =>
53+
{
54+
webSocket.OnMessage -= MessageReceived;
55+
ConnectionStateChangedEvent?.Invoke(this, State);
56+
};
4257
return webSocket.Connect();
4358
}
4459

60+
private void MessageReceived(byte[] message)
61+
{
62+
OnMessage?.Invoke(message);
63+
}
64+
4565
public Task CloseAsync(CancellationToken cancellationToken)
4666
=> webSocket.Close();
4767

48-
public Task<WebSocketReceiveResult> ReceiveAsync(Memory<byte> buffer, CancellationToken cancellationToken)
49-
{
50-
TaskCompletionSource<WebSocketReceiveResult> receiveMessageTask = new();
68+
public event IWebSocket.WebSocketMessageEventHandler OnMessage;
69+
public event EventHandler<WebSocketState> ConnectionStateChangedEvent;
5170

52-
void WebSocketOnOnMessage(byte[] bytes)
71+
public Task SendAsync(ReadOnlyMemory<byte> buffer, WebSocketMessageType messageType, bool endOfMessage, CancellationToken cancellationToken)
72+
{
73+
if (webSocket.State == NativeWebSocket.WebSocketState.Connecting)
5374
{
54-
bytes.CopyTo(buffer);
55-
WebSocketReceiveResult webSocketReceiveResult = new(bytes.Length, WebSocketMessageType.Text, true);
56-
MainThreadUtil.Run(() => receiveMessageTask.SetResult(webSocketReceiveResult));
57-
webSocket.OnMessage -= WebSocketOnOnMessage;
58-
Console.WriteLine("Message received");
75+
return _webSocketConnectionTask.Task.ContinueWith(_ =>
76+
{
77+
if (webSocket.State != NativeWebSocket.WebSocketState.Open)
78+
{
79+
throw new WebSocketException(WebSocketError.InvalidState, "WebSocket is not connected.");
80+
}
81+
return webSocket.Send(buffer.ToArray());
82+
}, cancellationToken).Unwrap();
83+
}
84+
if (webSocket.State != NativeWebSocket.WebSocketState.Open)
85+
{
86+
throw new WebSocketException(WebSocketError.InvalidState, "WebSocket is not connected.");
5987
}
60-
webSocket.OnMessage += WebSocketOnOnMessage;
61-
return receiveMessageTask.Task;
62-
}
63-
64-
public Task SendAsync(ReadOnlyMemory<byte> buffer, WebSocketMessageType messageType, bool endOfMessage,
65-
CancellationToken cancellationToken)
66-
{
6788
return webSocket.Send(buffer.ToArray());
6889
}
6990

0 commit comments

Comments
 (0)