Skip to content

Commit a9418a1

Browse files
committed
Added tests
1 parent 78dea32 commit a9418a1

File tree

3 files changed

+137
-48
lines changed

3 files changed

+137
-48
lines changed

src/Libplanet.Net/Protocols/ReqRepProtocol.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,10 @@ public async Task DialAsync(
4646
try
4747
{
4848
_transport.RequestMessageToSend += eventHandler;
49+
_logger.Debug(
50+
"Ready to send a request message to {RemotePeer} as {LocalPeer}",
51+
context.RemotePeer.Address,
52+
context.LocalPeer.Address);
4953
await SendAndReceiveMessage(request, downChannel, context);
5054
}
5155
finally

src/Libplanet.Net/Transports/Libp2pTransport.cs

Lines changed: 38 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
using System.Threading;
66
using System.Threading.Channels;
77
using System.Threading.Tasks;
8-
using Dasync.Collections;
98
using Libplanet.Crypto;
109
using Libplanet.Net.Messages;
1110
using Libplanet.Net.Options;
@@ -21,6 +20,8 @@ namespace Libplanet.Net.Transports
2120
public class Libp2pTransport : ITransport
2221
#pragma warning restore S101
2322
{
23+
public const int DialProtocolDelay = 100;
24+
2425
private readonly PrivateKey _privateKey;
2526
private readonly ILogger _logger;
2627
private readonly HostOptions _hostOptions;
@@ -33,6 +34,7 @@ public class Libp2pTransport : ITransport
3334
private Multiaddress? _listenerAddress = null;
3435

3536
private CancellationTokenSource _runtimeCancellationTokenSource;
37+
private bool _running = false;
3638
private bool _disposed = false;
3739

3840
public Libp2pTransport(
@@ -81,7 +83,7 @@ public Libp2pTransport(
8183

8284
public DateTimeOffset? LastMessageTimestamp { get; }
8385

84-
public bool Running { get; }
86+
public bool Running => _running;
8587

8688
public AppProtocolVersion AppProtocolVersion { get; }
8789

@@ -113,12 +115,14 @@ public static async Task<Libp2pTransport> Create(
113115
public Task StartAsync(CancellationToken cancellationToken = default)
114116
{
115117
// Does nothing.
118+
_running = true;
116119
return Task.CompletedTask;
117120
}
118121

119122
public Task StopAsync(TimeSpan waitFor, CancellationToken cancellationToken = default)
120123
{
121124
// Does nothing.
125+
_running = false;
122126
_runtimeCancellationTokenSource.Cancel();
123127
return Task.CompletedTask;
124128
}
@@ -155,13 +159,6 @@ public async Task<IEnumerable<Message>> SendMessageAsync(
155159
bool returnWhenTimeout,
156160
CancellationToken cancellationToken)
157161
{
158-
_logger.Information(
159-
"Trying to dial {Remote} As {LocalPeer}",
160-
peer.Multiaddress,
161-
LocalPeer.Address);
162-
IRemotePeer remote = await LocalPeer.DialAsync(peer.Multiaddress, default);
163-
_logger.Information("Dialing to {Remote} successful", peer.Multiaddress);
164-
165162
// FIXME: There should be default maximum timeout.
166163
CancellationTokenSource timerCts = new CancellationTokenSource();
167164
if (timeout is { } timeoutNotNull)
@@ -175,6 +172,16 @@ public async Task<IEnumerable<Message>> SendMessageAsync(
175172
cancellationToken,
176173
timerCts.Token);
177174

175+
_logger.Verbose(
176+
"Trying to dial {Remote} as {LocalPeer}",
177+
peer.Multiaddress,
178+
LocalPeer.Address);
179+
IRemotePeer remote = await LocalPeer.DialAsync(peer.Multiaddress);
180+
_logger.Verbose(
181+
"Dialing to {Remote} as {LocalPeer} was successful",
182+
peer.Multiaddress,
183+
LocalPeer.Address);
184+
178185
// FIXME: Add logging.
179186
_ = remote.DialAsync<ReqRepProtocol>(linkedCts.Token);
180187

@@ -188,12 +195,12 @@ public async Task<IEnumerable<Message>> SendMessageAsync(
188195
// FIXME: The tasks may not be ready to consume the message.
189196
// There needs to be a way to know whether the connection is ready
190197
// to consume the message.
191-
await Task.Delay(100);
198+
await Task.Delay(DialProtocolDelay);
192199
Channel<Message> inboundReplyChannel = Channel.CreateUnbounded<Message>();
193200
_logger.Information("Invoking sending message");
194201
RequestMessageToSend?.Invoke(
195202
this,
196-
(peer.Multiaddress, message, 1, inboundReplyChannel));
203+
(remote.Address, message, 1, inboundReplyChannel));
197204

198205
List<Message> replyMessages = new List<Message>();
199206

@@ -248,26 +255,33 @@ public async Task<IEnumerable<Message>> SendMessageAsync(
248255
replyMessages.Count);
249256
}
250257

251-
return returnWhenTimeout
252-
? replyMessages
253-
: new List<Message>();
258+
#pragma warning disable S3358 // Extract this ternary expresion.
259+
return timerCts.IsCancellationRequested
260+
? returnWhenTimeout
261+
? replyMessages
262+
: new List<Message>()
263+
: replyMessages;
264+
#pragma warning restore S3358
254265
}
255266

256267
public void BroadcastMessage(IEnumerable<BoundPeer> peers, MessageContent content)
257268
{
258269
if (_disposed)
259270
{
260-
throw new ObjectDisposedException(nameof(NetMQTransport));
271+
throw new ObjectDisposedException(nameof(Libp2pTransport));
261272
}
262273

263274
CancellationToken ct = _runtimeCancellationTokenSource.Token;
264275
List<BoundPeer> boundPeers = peers.ToList();
265-
Task.Run(
266-
async () =>
267-
await boundPeers.ParallelForEachAsync(
268-
peer => SendMessageAsync(peer, content, TimeSpan.FromSeconds(1), ct),
269-
ct),
270-
ct);
276+
277+
// FIXME: Parallel does not work.
278+
// Also should catch an exception and ignore it.
279+
foreach (var boundPeer in boundPeers)
280+
{
281+
_ = SendMessageAsync(boundPeer, content, TimeSpan.FromSeconds(1), ct)
282+
.GetAwaiter()
283+
.GetResult();
284+
}
271285

272286
_logger.Debug(
273287
"Broadcasting message {Message} as {AsPeer} to {PeerCount} peers",
@@ -298,16 +312,12 @@ private async Task Initialize(
298312
{
299313
// FIXME: Host being null should be dealt with.
300314
_logger.Information("Initialization started");
301-
string localPeerAddressTemplate = $"/ip4/{_hostOptions.Host}/tcp/0";
302-
string listenerAddressTemplate = $"/ip4/{_hostOptions.Host}/tcp/{_hostOptions.Port}";
315+
string addressTemplate = $"/ip4/{_hostOptions.Host}/tcp/{_hostOptions.Port}";
303316
IPeerFactory peerFactory = serviceProvider.GetService<IPeerFactory>()!;
304-
_logger.Information("Peer factory obtained");
305317
ILocalPeer localPeer = peerFactory.Create(
306-
CryptoKeyConverter.ToLibp2pIdentity(_privateKey),
307-
localPeerAddressTemplate);
318+
CryptoKeyConverter.ToLibp2pIdentity(_privateKey), addressTemplate);
308319
_logger.Information("Local peer created at {LocalPeerAddress}", localPeer.Address);
309-
IListener listener = await localPeer.ListenAsync(
310-
listenerAddressTemplate, cancellationToken);
320+
IListener listener = await localPeer.ListenAsync(addressTemplate, cancellationToken);
311321
_logger.Information("Listener started at {ListenerAddress}", listener.Address);
312322

313323
_localPeer = localPeer;

test/Libplanet.Net.Tests/Transports/Libp2pTransportTest.cs

Lines changed: 95 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
using System.Collections.Generic;
44
using System.Linq;
55
using System.Net;
6+
using System.Threading;
67
using System.Threading.Tasks;
78
using Libplanet.Crypto;
89
using Libplanet.Net.Messages;
@@ -18,8 +19,10 @@
1819

1920
namespace Libplanet.Net.Tests.Transports
2021
{
22+
[CollectionDefinition(nameof(Libp2pTransportTest), DisableParallelization = true)]
2123
public class Libp2pTransportTest : IDisposable
2224
{
25+
public const int Timeout = 30_000;
2326
private bool _disposed;
2427
private ILogger _logger;
2528

@@ -41,7 +44,7 @@ public Libp2pTransportTest(ITestOutputHelper testOutputHelper)
4144
Dispose(false);
4245
}
4346

44-
[Fact(Timeout = 10_000)]
47+
[Fact(Timeout = Timeout)]
4548
public async Task Initialize()
4649
{
4750
PrivateKey privateKey = new PrivateKey();
@@ -67,10 +70,16 @@ public async Task Initialize()
6770
Assert.Equal(expected, transport.AsPeer);
6871
}
6972

70-
[Fact(Timeout = 10_000)]
71-
public async Task DialToListeners()
73+
[Theory(Timeout = Timeout)]
74+
[InlineData(true)]
75+
[InlineData(false)]
76+
public async Task DialToListeners(bool usePortZero)
7277
{
78+
// NOTE: Using port 0 does not work for this test.
7379
int count = 2;
80+
List<int> freePorts = usePortZero
81+
? Enumerable.Range(0, count).Select(_ => 0).ToList()
82+
: TestUtils.GetFreePorts(count);
7483
List<PrivateKey> privateKeys = Enumerable
7584
.Range(0, count)
7685
.Select(_ => new PrivateKey())
@@ -80,7 +89,7 @@ public async Task DialToListeners()
8089
.Select(i => new Libp2pTransport(
8190
privateKeys[i],
8291
new AppProtocolVersionOptions(),
83-
new HostOptions("127.0.0.1", new IceServer[] { }, 0)))
92+
new HostOptions("127.0.0.1", new IceServer[] { }, freePorts[i])))
8493
.ToList();
8594
List<IServiceProvider> serviceProviders = transports
8695
.Select(transport => GetServiceProvider(transport))
@@ -92,11 +101,12 @@ public async Task DialToListeners()
92101
.Range(0, count)
93102
.Select(i => peerFactories[i].Create(
94103
CryptoKeyConverter.ToLibp2pIdentity(privateKeys[i]),
95-
"/ip4/127.0.0.1/tcp/0"))
104+
$"/ip4/127.0.0.1/tcp/{freePorts[i]}"))
96105
.ToList();
97-
List<IListener> listeners = localPeers
98-
.Select(async localPeer =>
99-
await localPeer.ListenAsync("/ip4/127.0.0.1/tcp/0", default))
106+
List<IListener> listeners = Enumerable
107+
.Range(0, count)
108+
.Select(async i =>
109+
await localPeers[i].ListenAsync($"/ip4/127.0.0.1/tcp/{freePorts[i]}", default))
100110
.Select(task => task.Result)
101111
.ToList();
102112
List<Multiaddress> listenerAddresses = listeners
@@ -110,15 +120,37 @@ await localPeer.ListenAsync("/ip4/127.0.0.1/tcp/0", default))
110120
Assert.Equal(listenerAddresses[0], remote1.Address);
111121
}
112122

113-
[Fact(Timeout = 10_000)]
114-
public async Task DialToTransports()
123+
[Fact(Timeout = Timeout)]
124+
public async Task DialCancel()
125+
{
126+
PrivateKey privateKey = new PrivateKey();
127+
List<int> freePorts = TestUtils.GetFreePorts(2);
128+
Libp2pTransport transport = await Libp2pTransport.Create(
129+
privateKey,
130+
new AppProtocolVersionOptions(),
131+
new HostOptions("127.0.0.1", new IceServer[] { }, freePorts[0]));
132+
133+
Identity identity = CryptoKeyConverter.ToLibp2pIdentity(new PrivateKey());
134+
Multiaddress badAddress = $"/ip4/127.0.0.1/tcp/{freePorts[1]}/p2p/{identity.PeerId}";
135+
CancellationTokenSource cts = new CancellationTokenSource();
136+
cts.CancelAfter(1_000);
137+
await Assert.ThrowsAsync<TaskCanceledException>(
138+
async () => await transport.LocalPeer.DialAsync(badAddress, cts.Token));
139+
}
140+
141+
[Theory(Timeout = Timeout)]
142+
[InlineData(true)]
143+
[InlineData(false)]
144+
public async Task DialToTransports(bool usePortZero)
115145
{
116146
int count = 2;
147+
List<int> freePorts = usePortZero
148+
? Enumerable.Range(0, count).Select(_ => 0).ToList()
149+
: TestUtils.GetFreePorts(count);
117150
List<PrivateKey> privateKeys = Enumerable
118151
.Range(0, count)
119152
.Select(_ => new PrivateKey())
120153
.ToList();
121-
List<int> freePorts = TestUtils.GetFreePorts(2);
122154
List<HostOptions> hosts = freePorts
123155
.Select(freePort => new HostOptions("127.0.0.1", new IceServer[] { }, freePort))
124156
.ToList();
@@ -140,15 +172,19 @@ public async Task DialToTransports()
140172
Assert.Equal(transports[0].ListenerAddress, remote1.Address);
141173
}
142174

143-
[Fact(Timeout = 10_000)]
144-
public async Task RequestReply()
175+
[Theory(Timeout = Timeout)]
176+
[InlineData(true)]
177+
[InlineData(false)]
178+
public async Task RequestReply(bool usePortZero)
145179
{
146180
int count = 2;
181+
List<int> freePorts = usePortZero
182+
? Enumerable.Range(0, count).Select(_ => 0).ToList()
183+
: TestUtils.GetFreePorts(count);
147184
List<PrivateKey> privateKeys = Enumerable
148185
.Range(0, count)
149186
.Select(_ => new PrivateKey())
150187
.ToList();
151-
List<int> freePorts = TestUtils.GetFreePorts(2);
152188
List<Libp2pTransport> transports = Enumerable
153189
.Range(0, count)
154190
.Select(async i => await Libp2pTransport.Create(
@@ -166,15 +202,54 @@ public async Task RequestReply()
166202
}
167203
});
168204

169-
List<Message> reply = (await transports[0].SendMessageAsync(
205+
Message reply = await transports[0].SendMessageAsync(
170206
transports[1].AsPeer,
171207
new PingMsg(),
172208
TimeSpan.FromSeconds(5),
173-
1,
174-
true,
175-
default)).ToList();
176-
Message single = Assert.Single<Message>(reply);
177-
Assert.IsType<PongMsg>(single.Content);
209+
default);
210+
Assert.IsType<PongMsg>(reply.Content);
211+
}
212+
213+
[Theory(Timeout = Timeout)]
214+
[InlineData(true)]
215+
[InlineData(false)]
216+
public async Task Broadcast(bool usePortZero)
217+
{
218+
int count = 4;
219+
List<int> freePorts = usePortZero
220+
? Enumerable.Range(0, count).Select(_ => 0).ToList()
221+
: TestUtils.GetFreePorts(count);
222+
List<PrivateKey> privateKeys = Enumerable
223+
.Range(0, count)
224+
.Select(_ => new PrivateKey())
225+
.ToList();
226+
List<Libp2pTransport> transports = Enumerable
227+
.Range(0, count)
228+
.Select(async i => await Libp2pTransport.Create(
229+
privateKeys[i],
230+
new AppProtocolVersionOptions(),
231+
new HostOptions("127.0.0.1", new IceServer[] { }, freePorts[i])))
232+
.Select(task => task.Result)
233+
.ToList();
234+
235+
int receivedCount = 0;
236+
foreach (var transport in transports.Skip(1).ToList())
237+
{
238+
transport.ProcessMessageHandler.Register(async (message, channel) =>
239+
{
240+
if (message.Content is PingMsg)
241+
{
242+
await channel.Writer.WriteAsync(new PongMsg());
243+
receivedCount++;
244+
}
245+
});
246+
}
247+
248+
transports[0].BroadcastMessage(
249+
transports.Skip(1).Select(transport => transport.AsPeer).ToList(),
250+
new PingMsg());
251+
await Task.Delay(1_000);
252+
Assert.Equal(count - 1, receivedCount);
178253
}
179254

180255
public void Dispose()

0 commit comments

Comments
 (0)