Skip to content

Commit 2223eaa

Browse files
authored
Implement Deduplicating Producer (#234)
* Implement DeduplicationProducer Fixes #227 Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 280322e commit 2223eaa

File tree

9 files changed

+307
-39
lines changed

9 files changed

+307
-39
lines changed

RabbitMQ.Stream.Client/PublicAPI.Shipped.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -658,7 +658,6 @@ RabbitMQ.Stream.Client.Reliable.ProducerConfig.MessagesBufferSize.get -> int
658658
RabbitMQ.Stream.Client.Reliable.ProducerConfig.MessagesBufferSize.set -> void
659659
RabbitMQ.Stream.Client.Reliable.ProducerConfig.ProducerConfig(RabbitMQ.Stream.Client.StreamSystem streamSystem, string stream) -> void
660660
RabbitMQ.Stream.Client.Reliable.ProducerConfig.Reference.get -> string
661-
RabbitMQ.Stream.Client.Reliable.ProducerConfig.Reference.set -> void
662661
RabbitMQ.Stream.Client.Reliable.ProducerConfig.SuperStreamConfig.get -> RabbitMQ.Stream.Client.Reliable.SuperStreamConfig
663662
RabbitMQ.Stream.Client.Reliable.ProducerConfig.SuperStreamConfig.set -> void
664663
RabbitMQ.Stream.Client.Reliable.ProducerConfig.TimeoutMessageAfter.get -> System.TimeSpan
Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,12 @@
11
RabbitMQ.Stream.Client.AbstractEntity.MaybeCancelToken() -> void
22
RabbitMQ.Stream.Client.AbstractEntity.Token.get -> System.Threading.CancellationToken
3-
RabbitMQ.Stream.Client.Chunk.MagicVersion.get -> byte
3+
RabbitMQ.Stream.Client.Chunk.MagicVersion.get -> byte
4+
RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer
5+
RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.Close() -> System.Threading.Tasks.Task
6+
RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.GetLastPublishedId() -> System.Threading.Tasks.Task<ulong>
7+
RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.IsOpen() -> bool
8+
RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.Send(ulong publishing, RabbitMQ.Stream.Client.Message message) -> System.Threading.Tasks.ValueTask
9+
RabbitMQ.Stream.Client.Reliable.DeduplicatingProducerConfig
10+
RabbitMQ.Stream.Client.Reliable.DeduplicatingProducerConfig.DeduplicatingProducerConfig(RabbitMQ.Stream.Client.StreamSystem streamSystem, string stream, string reference) -> void
11+
RabbitMQ.Stream.Client.Reliable.ProducerConfig.Reference.set -> void
12+
static RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.Create(RabbitMQ.Stream.Client.Reliable.DeduplicatingProducerConfig producerConfig, Microsoft.Extensions.Logging.ILogger<RabbitMQ.Stream.Client.Reliable.Producer> logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer>

RabbitMQ.Stream.Client/RawProducer.cs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,8 @@ public static async Task<IProducer> Create(
5959
ILogger logger = null
6060
)
6161
{
62-
var client = await RoutingHelper<Routing>.LookupLeaderConnection(clientParameters, metaStreamInfo, logger).ConfigureAwait(false);
62+
var client = await RoutingHelper<Routing>.LookupLeaderConnection(clientParameters, metaStreamInfo, logger)
63+
.ConfigureAwait(false);
6364

6465
var producer = new RawProducer((Client)client, config, logger);
6566
await producer.Init().ConfigureAwait(false);
@@ -224,7 +225,13 @@ private async Task SendMessages(List<(ulong, Message)> messages, bool clearMessa
224225
/// <returns>The last sequence id stored by the producer.</returns>
225226
public async Task<ulong> GetLastPublishingId()
226227
{
227-
var response = await _client.QueryPublisherSequence(_config.Reference, _config.Stream).ConfigureAwait(false);
228+
if (string.IsNullOrWhiteSpace(_config.Reference))
229+
{
230+
return 0;
231+
}
232+
233+
var response = await _client.QueryPublisherSequence(_config.Reference, _config.Stream)
234+
.ConfigureAwait(false);
228235
ClientExceptions.MaybeThrowException(response.ResponseCode,
229236
$"GetLastPublishingId stream: {_config.Stream}, reference: {_config.Reference}");
230237
return response.Sequence;
@@ -310,7 +317,8 @@ public async Task<ResponseCode> Close()
310317
// in this case we reduce the waiting time
311318
// the producer could be removed because of stream deleted
312319
// so it is not necessary to wait.
313-
var closeResponse = await _client.DeletePublisher(_publisherId).WaitAsync(TimeSpan.FromSeconds(3)).ConfigureAwait(false);
320+
var closeResponse = await _client.DeletePublisher(_publisherId).WaitAsync(TimeSpan.FromSeconds(3))
321+
.ConfigureAwait(false);
314322
result = closeResponse.ResponseCode;
315323
}
316324
catch (Exception e)
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 2.0.
3+
// Copyright (c) 2007-2023 VMware, Inc.
4+
5+
using System;
6+
using System.Threading.Tasks;
7+
using Microsoft.Extensions.Logging;
8+
9+
namespace RabbitMQ.Stream.Client.Reliable;
10+
11+
public record DeduplicatingProducerConfig : ProducerConfig
12+
{
13+
public DeduplicatingProducerConfig(StreamSystem streamSystem, string stream, string reference) : base(streamSystem,
14+
stream)
15+
{
16+
if (string.IsNullOrWhiteSpace(reference))
17+
throw new ArgumentException("Reference cannot be null or empty", nameof(reference));
18+
_reference = reference;
19+
}
20+
}
21+
22+
// DeduplicatingProducer is a wrapper around the Producer class
23+
// to handle the deduplication of the messages.
24+
// The deduplication is enabled by setting the reference in the DeduplicationProducerConfig
25+
// and it is mandatory to set the reference.
26+
// This class it to use in an easy way the deduplication feature.
27+
// the low level API is the RawProducer class, this class sets the right parameters to enable the deduplication
28+
// The only api is `Send(ulong publishing, Message message)`. In this case the user has to manage the sequence
29+
// to decide deduplication or not.
30+
// The best way to handle the deduplication is to use a single thread avoiding the id overlaps.
31+
32+
public class DeduplicatingProducer
33+
{
34+
private Producer _producer = null!;
35+
36+
public static async Task<DeduplicatingProducer> Create(DeduplicatingProducerConfig producerConfig,
37+
ILogger<Producer> logger = null)
38+
{
39+
var x = new DeduplicatingProducer()
40+
{
41+
_producer = await Producer
42+
.Create(
43+
new ProducerConfig(producerConfig.StreamSystem, producerConfig.Stream)
44+
{
45+
_reference = producerConfig.Reference,
46+
ConfirmationHandler = producerConfig.ConfirmationHandler,
47+
ReconnectStrategy = producerConfig.ReconnectStrategy,
48+
ClientProvidedName = producerConfig.ClientProvidedName,
49+
MaxInFlight = producerConfig.MaxInFlight,
50+
MessagesBufferSize = producerConfig.MessagesBufferSize,
51+
TimeoutMessageAfter = producerConfig.TimeoutMessageAfter,
52+
}, logger)
53+
.ConfigureAwait(false)
54+
};
55+
return x;
56+
}
57+
58+
private DeduplicatingProducer()
59+
{
60+
}
61+
62+
// Send a message with a specific publishing id
63+
// the publishing id is used to deduplicate the messages
64+
// the publishing id must be unique and incremental. The publishing ID may have gaps.
65+
// It is important to always increment the ID, otherwise, messages will be discarded by the deduplication algorithm
66+
public async ValueTask Send(ulong publishing, Message message)
67+
{
68+
await _producer.SendInternal(publishing, message).ConfigureAwait(false);
69+
}
70+
71+
public async Task Close()
72+
{
73+
await _producer.Close().ConfigureAwait(false);
74+
}
75+
76+
public bool IsOpen()
77+
{
78+
return _producer.IsOpen();
79+
}
80+
81+
// Get the last publishing id from the producer/reference
82+
// this is useful to know the last id used to deduplicate the messages
83+
// so it is possible to restart the producer with the last id
84+
public async Task<ulong> GetLastPublishedId()
85+
{
86+
return await _producer.GetLastPublishingId().ConfigureAwait(false);
87+
}
88+
}

RabbitMQ.Stream.Client/Reliable/Producer.cs

Lines changed: 42 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,35 @@ public record SuperStreamConfig
1818
public Func<Message, string> Routing { get; set; }
1919
}
2020

21+
[AttributeUsage(AttributeTargets.Method)]
22+
internal class MyMethodAttribute : Attribute
23+
{
24+
public string Message { get; }
25+
26+
public MyMethodAttribute(string message)
27+
{
28+
Message = message;
29+
}
30+
}
31+
2132
public record ProducerConfig : ReliableConfig
2233
{
2334
private readonly TimeSpan _timeoutMessageAfter = TimeSpan.FromSeconds(3);
2435

2536
/// <summary>
26-
/// Reference is mostly used for deduplication.
27-
/// In most of the cases reference is not needed.
37+
/// Reference used for deduplication.
38+
/// For the Producer Class, it is not needed to set this value
39+
/// See DeduplicatingProducer for Deduplication Messages where this value is needed.
2840
/// </summary>
29-
public string Reference { get; set; }
41+
internal string _reference;
42+
43+
public string Reference
44+
{
45+
get { return _reference; }
46+
[Obsolete("Deprecated. Use ClientProvidedName instead. Se DeduplicatingProducer for Deduplication Messages ",
47+
false)]
48+
set { _reference = value; }
49+
}
3050

3151
/// <summary>
3252
/// Publish confirmation callback.<br/>
@@ -113,7 +133,7 @@ public class Producer : ProducerFactory
113133

114134
protected override ILogger BaseLogger => _logger;
115135

116-
private Producer(ProducerConfig producerConfig, ILogger<Producer> logger = null)
136+
private protected Producer(ProducerConfig producerConfig, ILogger<Producer> logger = null)
117137
{
118138
_producerConfig = producerConfig;
119139
_confirmationPipe = new ConfirmationPipe(
@@ -205,11 +225,25 @@ public override async Task Close()
205225
/// In case of error the message is considered as timed out, you will receive a confirmation with the status TimedOut.
206226
public async ValueTask Send(Message message)
207227
{
208-
209228
await SemaphoreSlim.WaitAsync().ConfigureAwait(false);
229+
try
230+
{
231+
await SendInternal(Interlocked.Increment(ref _publishingId), message).ConfigureAwait(false);
232+
}
233+
finally
234+
{
235+
SemaphoreSlim.Release();
236+
}
237+
}
210238

211-
Interlocked.Increment(ref _publishingId);
212-
_confirmationPipe.AddUnConfirmedMessage(_publishingId, message);
239+
internal async Task<ulong> GetLastPublishingId()
240+
{
241+
return await _producer.GetLastPublishingId().ConfigureAwait(false);
242+
}
243+
244+
internal async ValueTask SendInternal(ulong publishingId, Message message)
245+
{
246+
_confirmationPipe.AddUnConfirmedMessage(publishingId, message);
213247
try
214248
{
215249
// This flags avoid some race condition,
@@ -219,7 +253,7 @@ public async ValueTask Send(Message message)
219253
// on the _waitForConfirmation list. The user will get Timeout Error
220254
if (!(_inReconnection))
221255
{
222-
await _producer.Send(_publishingId, message).ConfigureAwait(false);
256+
await _producer.Send(publishingId, message).ConfigureAwait(false);
223257
}
224258
}
225259

@@ -230,10 +264,6 @@ public async ValueTask Send(Message message)
230264
"Message wont' receive confirmation so you will receive a timeout error",
231265
_producerConfig.Stream);
232266
}
233-
finally
234-
{
235-
SemaphoreSlim.Release();
236-
}
237267
}
238268

239269
/// <summary>
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 2.0.
3+
// Copyright (c) 2007-2023 VMware, Inc.
4+
5+
using System;
6+
using System.Threading.Tasks;
7+
using RabbitMQ.Stream.Client;
8+
using RabbitMQ.Stream.Client.Reliable;
9+
using Xunit;
10+
using Xunit.Abstractions;
11+
12+
namespace Tests;
13+
14+
public class DeduplicationProducerTests
15+
{
16+
private readonly ITestOutputHelper _testOutputHelper;
17+
18+
public DeduplicationProducerTests(ITestOutputHelper testOutputHelper)
19+
{
20+
_testOutputHelper = testOutputHelper;
21+
}
22+
23+
[Fact]
24+
public async Task ValidateDeduplicationProducer()
25+
{
26+
SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream);
27+
Assert.Throws<ArgumentException>(() => new DeduplicatingProducerConfig(system, stream, null));
28+
await Assert.ThrowsAsync<ArgumentException>(async () =>
29+
// reference is white space, not valid
30+
await DeduplicatingProducer.Create(new DeduplicatingProducerConfig(system, stream, " ")));
31+
await SystemUtils.CleanUpStreamSystem(system, stream);
32+
}
33+
34+
[Fact]
35+
public async Task GetLastIdShouldBeEqualtoTheMessagesSent()
36+
{
37+
// here we create a producer with a reference
38+
// the reference is used to enable the deduplication
39+
// then we query the sequence externally form the producer to be sure that
40+
// the values are the same
41+
SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream);
42+
var testPassed = new TaskCompletionSource<ulong>();
43+
const ulong TotalMessages = 1000UL;
44+
var p = await DeduplicatingProducer.Create(
45+
new DeduplicatingProducerConfig(system, stream, "my_producer_reference")
46+
{
47+
ConfirmationHandler = async confirmation =>
48+
{
49+
if (confirmation.PublishingId == TotalMessages)
50+
testPassed.SetResult(TotalMessages);
51+
await Task.CompletedTask;
52+
},
53+
});
54+
for (ulong i = 1; i <= TotalMessages; i++)
55+
{
56+
await p.Send(i, new Message(new byte[10]));
57+
}
58+
59+
new Utils<ulong>(_testOutputHelper).WaitUntilTaskCompletes(testPassed);
60+
SystemUtils.Wait();
61+
Assert.Equal(TotalMessages, await p.GetLastPublishedId());
62+
await p.Close();
63+
Assert.False(p.IsOpen());
64+
65+
// here we query the sequence externally form the producer to be sure that
66+
// the values are the same
67+
Assert.Equal(TotalMessages, await system.QuerySequence("my_producer_reference", stream));
68+
await SystemUtils.CleanUpStreamSystem(system, stream);
69+
}
70+
71+
[Fact]
72+
public async Task DeduplicationInActionSendingTheSameIdMessagesWontStore()
73+
{
74+
// in this test we send the same messages again with the same publishing id
75+
// to see the deduplication in action
76+
77+
SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream);
78+
var testPassed = new TaskCompletionSource<ulong>();
79+
const ulong TotalMessages = 1000UL;
80+
var p = await DeduplicatingProducer.Create(
81+
new DeduplicatingProducerConfig(system, stream, "my_producer_reference")
82+
{
83+
ConfirmationHandler = async confirmation =>
84+
{
85+
if (confirmation.PublishingId == TotalMessages)
86+
testPassed.SetResult(TotalMessages);
87+
await Task.CompletedTask;
88+
},
89+
});
90+
// first send and the messages are stored
91+
for (ulong i = 1; i <= TotalMessages; i++)
92+
{
93+
await p.Send(i, new Message(new byte[10]));
94+
}
95+
96+
new Utils<ulong>(_testOutputHelper).WaitUntilTaskCompletes(testPassed);
97+
SystemUtils.Wait();
98+
Assert.Equal(TotalMessages, await p.GetLastPublishedId());
99+
100+
// we send the same messages again with the same publishing id
101+
// so the messages won't be stored due of the deduplication
102+
for (ulong i = 1; i <= TotalMessages; i++)
103+
{
104+
await p.Send(i, new Message(new byte[10]));
105+
}
106+
107+
SystemUtils.WaitUntil(() => SystemUtils.HttpGetQMsgCount(stream) == (int)TotalMessages);
108+
109+
// we are out of the deduplication window so the messages will be stored
110+
// we start from the last published id + 1
111+
await p.Send(await p.GetLastPublishedId() + 1, new Message(new byte[10]));
112+
await p.Send(await p.GetLastPublishedId() + 2, new Message(new byte[10]));
113+
114+
// the total messages should be the TotalMessages + 2 new messages
115+
SystemUtils.WaitUntil(() => SystemUtils.HttpGetQMsgCount(stream) == (int)TotalMessages + 2);
116+
await p.Close();
117+
Assert.False(p.IsOpen());
118+
119+
await SystemUtils.CleanUpStreamSystem(system, stream);
120+
}
121+
}

0 commit comments

Comments
 (0)