Skip to content

Commit aadb40d

Browse files
fix: streaming rpc (#60)
1 parent af49063 commit aadb40d

File tree

4 files changed

+40
-4
lines changed

4 files changed

+40
-4
lines changed

src/Solana.Unity.Rpc/ClientFactory.cs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
using NativeWebSocket;
12
using Solana.Unity.Rpc.Utilities;
23
using System.Net.Http;
34
using System.Net.WebSockets;
@@ -161,6 +162,20 @@ public static IStreamingRpcClient GetStreamingClient(
161162
};
162163
return GetStreamingClient(url, logger);
163164
}
165+
166+
public static IStreamingRpcClient GetStreamingClient(
167+
Cluster cluster,
168+
IWebSocket socket)
169+
{
170+
var url = cluster switch
171+
{
172+
Cluster.DevNet => StreamingRpcDevNet,
173+
Cluster.TestNet => StreamingRpcTestNet,
174+
Cluster.LocalNet => StreamingRpcLocalNet,
175+
_ => StreamingRpcMainNet,
176+
};
177+
return GetStreamingClient(url, null, socket, null);
178+
}
164179

165180
/// <summary>
166181
/// Instantiate a streaming client.
@@ -169,7 +184,7 @@ public static IStreamingRpcClient GetStreamingClient(
169184
/// <param name="logger">The logger.</param>
170185
/// <param name="clientWebSocket">A ClientWebSocket instance. If null, a new instance will be created.</param>
171186
/// <returns>The streaming client.</returns>
172-
public static IStreamingRpcClient GetStreamingClient(string url, object logger = null, ClientWebSocket clientWebSocket = null)
187+
public static IStreamingRpcClient GetStreamingClient(string url, object logger = null, IWebSocket socket = null, ClientWebSocket clientWebSocket = null)
173188
{
174189
return new SolanaStreamingRpcClient(url, logger, null, clientWebSocket);
175190
}

src/Solana.Unity.Rpc/Core/Sockets/StreamingRpcClient.cs

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
using Solana.Unity.Rpc.Types;
22
using System;
3-
using System.IO;
43
using System.Net.WebSockets;
54
using System.Threading;
65
using System.Threading.Tasks;
@@ -58,14 +57,30 @@ protected StreamingRpcClient(string url, object logger, IWebSocket socket = defa
5857
_connectionStats = new ConnectionStats();
5958
ClientSocket.ConnectionStateChangedEvent += (sender, state) => ConnectionStateChangedEvent?.Invoke(sender, state);
6059
}
60+
61+
/// <summary>
62+
/// Constructor that setups the client with a IWebSocket instance.
63+
/// </summary>
64+
/// <param name="url"></param>
65+
/// <param name="socket"></param>
66+
protected StreamingRpcClient(string url, IWebSocket socket)
67+
{
68+
NodeAddress = new Uri(url);
69+
ClientSocket = socket ?? new WebSocketWrapper();
70+
_logger = null;
71+
_sem = new SemaphoreSlim(1, 1);
72+
_connectionStats = new ConnectionStats();
73+
ClientSocket.ConnectionStateChangedEvent += (sender, state) => ConnectionStateChangedEvent?.Invoke(sender, state);
74+
}
6175

6276
/// <summary>
6377
/// Initializes the websocket connection and starts receiving messages asynchronously.
6478
/// </summary>
6579
/// <returns>Returns the task representing the asynchronous task.</returns>
6680
public async Task ConnectAsync()
6781
{
68-
_sem.Wait();
82+
if (ClientSocket.State == WebSocketState.Open) return;
83+
await _sem.WaitAsync().ConfigureAwait(false);
6984
try
7085
{
7186
if (ClientSocket.State != WebSocketState.Open)
@@ -94,7 +109,8 @@ private void DispatchMessage(byte[] message)
94109
/// <inheritdoc cref="IStreamingRpcClient.DisconnectAsync"/>
95110
public async Task DisconnectAsync()
96111
{
97-
_sem.Wait();
112+
if (ClientSocket.State == WebSocketState.Closed) return;
113+
await _sem.WaitAsync().ConfigureAwait(false);
98114
try
99115
{
100116
if (ClientSocket.State == WebSocketState.Open)

src/Solana.Unity.Rpc/Core/Sockets/WebSocketWrapper.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,10 @@ public Task ConnectAsync(Uri uri, CancellationToken cancellationToken)
4545
webSocket = WebSocket.Create(uri.AbsoluteUri);
4646
webSocket.OnOpen += () =>
4747
{
48+
MainThreadUtil.Run(() =>
49+
{
50+
_webSocketConnectionTask.TrySetResult(true);
51+
});
4852
_webSocketConnectionTask.TrySetResult(true);
4953
webSocket.OnMessage += MessageReceived;
5054
ConnectionStateChangedEvent?.Invoke(this, State);

src/Solana.Unity.Rpc/SolanaStreamingRpcClient.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ internal SolanaStreamingRpcClient(string url, object logger = null, IWebSocket w
6060
};
6161
}
6262

63+
6364
/// <summary>
6465
/// Try Reconnect to the server and reopening the confirmed subscription.
6566
/// </summary>

0 commit comments

Comments
 (0)