From 8b75fdb2770bbfe552a86b956e3f1ccbd3004b80 Mon Sep 17 00:00:00 2001 From: Mauro Agnoletti Date: Tue, 2 Jul 2019 16:45:27 -0300 Subject: [PATCH] Added Integration Tests to cover IMqttClient.MessageStream fixes Also fixed existing tests that started to fail after the fix was applied --- src/IntegrationTests/ConnectionSpec.cs | 19 ++++++ src/IntegrationTests/PublishingSpec.cs | 85 ++++++++++++++++++++++++++ 2 files changed, 104 insertions(+) diff --git a/src/IntegrationTests/ConnectionSpec.cs b/src/IntegrationTests/ConnectionSpec.cs index 4d04d9d1..0fd76d6a 100644 --- a/src/IntegrationTests/ConnectionSpec.cs +++ b/src/IntegrationTests/ConnectionSpec.cs @@ -502,6 +502,25 @@ public async Task when_client_disconnects_unexpectedly_then_will_message_is_sent client3.Dispose(); } + [Fact] + public async Task when_client_disconnects_then_message_stream_completes() + { + var streamCompletedSignal = new ManualResetEventSlim(initialState: false); + var client = await GetClientAsync(); + + await client.ConnectAsync(new MqttClientCredentials(GetClientId())); + + client.MessageStream.Subscribe(_ => { }, onCompleted: () => streamCompletedSignal.Set()); + + await client.DisconnectAsync(); + + var streamCompleted = streamCompletedSignal.Wait(2000); + + Assert.True(streamCompleted); + + client.Dispose(); + } + public void Dispose() => server?.Dispose(); } } diff --git a/src/IntegrationTests/PublishingSpec.cs b/src/IntegrationTests/PublishingSpec.cs index 2388d9ef..3f37b0ac 100644 --- a/src/IntegrationTests/PublishingSpec.cs +++ b/src/IntegrationTests/PublishingSpec.cs @@ -524,8 +524,19 @@ await subscriber }); await subscriber.DisconnectAsync(); + var sessionState = await subscriber.ConnectAsync(new MqttClientCredentials(subscriberId), cleanSession: false); + subscriber + .MessageStream + .Where(m => m.Topic == topic) + .Subscribe(m => { + subscriberReceived++; + + if (subscriberReceived == count) + subscriberDone.Set(); + }); + var tasks = new List(); for (var i = 1; i <= count; i++) @@ -608,6 +619,80 @@ await subscriber.UnsubscribeAsync(topic) publisher.Dispose(); } + [Fact] + public async Task when_publish_messages_and_client_disconnects_then_message_stream_is_reset() + { + var topic = Guid.NewGuid().ToString(); + + var publisher = await GetClientAsync(); + var subscriber = await GetClientAsync(); + var subscriberId = subscriber.Id; + + var goal = default(int); + var goalAchieved = new ManualResetEventSlim(); + var received = 0; + + await subscriber.SubscribeAsync(topic, MqttQualityOfService.AtMostOnce).ConfigureAwait(continueOnCapturedContext: false); + + subscriber + .MessageStream + .Subscribe(m => { + if (m.Topic == topic) + { + received++; + + if (received == goal) + goalAchieved.Set(); + } + }); + + goal = 5; + + var tasks = new List(); + + for (var i = 1; i <= goal; i++) + { + var testMessage = GetTestMessage(i); + var message = new MqttApplicationMessage(topic, Serializer.Serialize(testMessage)); + + tasks.Add(publisher.PublishAsync(message, MqttQualityOfService.AtMostOnce)); + } + + await Task.WhenAll(tasks); + + var completed = goalAchieved.Wait(TimeSpan.FromSeconds(Configuration.WaitTimeoutSecs)); + + Assert.True(completed); + Assert.Equal(goal, received); + + await subscriber.DisconnectAsync(); + + goal = 3; + goalAchieved.Reset(); + received = 0; + completed = false; + + await subscriber.ConnectAsync(new MqttClientCredentials(subscriberId), cleanSession: false); + + for (var i = 1; i <= goal; i++) + { + var testMessage = GetTestMessage(i); + var message = new MqttApplicationMessage(topic, Serializer.Serialize(testMessage)); + + tasks.Add(publisher.PublishAsync(message, MqttQualityOfService.AtMostOnce)); + } + + completed = goalAchieved.Wait(TimeSpan.FromSeconds(Configuration.WaitTimeoutSecs)); + + Assert.False(completed); + Assert.Equal(0, received); + + await subscriber.UnsubscribeAsync(topic).ConfigureAwait(continueOnCapturedContext: false); + + subscriber.Dispose(); + publisher.Dispose(); + } + public void Dispose () { if (server != null) {