Skip to content

Commit ac23f56

Browse files
authored
Implement Add/Remove super stream feature (#357)
* Implement Add/Remove super stream feature * add PartitionsSuperStreamSpec to configure the Partitions * add BindingsSuperStreamSpec to configure the Bindings --------- Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 335a0f3 commit ac23f56

22 files changed

+585
-86
lines changed

.ci/versions.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"erlang": "26.1.2",
3-
"rabbitmq": "3.13.0-rc.2"
3+
"rabbitmq": "3.13.0-rc.4"
44
}

Examples/Performances/BatchVsBatchSend.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,10 @@ private static async Task<string> RecreateStream(StreamSystem system, string str
231231

232232
Thread.Sleep(5000);
233233

234-
await system.CreateStream(new StreamSpec(stream) { });
234+
await system.CreateStream(new StreamSpec(stream)
235+
{
236+
237+
});
235238
Thread.Sleep(1000);
236239
Console.WriteLine($"Stream: {stream} created");
237240
return stream;

RabbitMQ.Stream.Client/AvailableFeatures.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ internal class AvailableFeatures
1616
public bool PublishFilter { get; private set; }
1717

1818
public bool Is311OrMore { get; private set; }
19+
public bool Is313OrMore { get; private set; }
1920

2021
public string BrokerVersion { get; private set; }
2122

@@ -31,9 +32,9 @@ private static string ExtractVersion(string fullVersion)
3132

3233
public void SetServerVersion(string brokerVersion)
3334
{
34-
var v = ExtractVersion(brokerVersion);
35-
BrokerVersion = v;
36-
Is311OrMore = new System.Version(v) >= new System.Version("3.11.0");
35+
BrokerVersion = ExtractVersion(brokerVersion);
36+
Is311OrMore = new System.Version(BrokerVersion) >= new System.Version("3.11.0");
37+
Is313OrMore = new System.Version(BrokerVersion) >= new System.Version("3.13.0");
3738
}
3839

3940
public void ParseCommandVersions(List<ICommandVersions> commands)

RabbitMQ.Stream.Client/Client.cs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -724,6 +724,14 @@ private void HandleCorrelatedCommand(ushort tag, ref ReadOnlySequence<byte> fram
724724
CommandVersionsResponse.Read(frame, out var commandVersionsResponse);
725725
HandleCorrelatedResponse(commandVersionsResponse);
726726
break;
727+
case CreateSuperStreamResponse.Key:
728+
CreateSuperStreamResponse.Read(frame, out var superStreamResponse);
729+
HandleCorrelatedResponse(superStreamResponse);
730+
break;
731+
case DeleteSuperStreamResponse.Key:
732+
DeleteSuperStreamResponse.Read(frame, out var deleteSuperStreamResponse);
733+
HandleCorrelatedResponse(deleteSuperStreamResponse);
734+
break;
727735
default:
728736
if (MemoryMarshal.TryGetArray(frame.First, out var segment))
729737
{
@@ -863,7 +871,6 @@ public async Task<bool> StreamExists(string stream)
863871
if (response.StreamInfos is { Count: >= 1 } &&
864872
response.StreamInfos[stream].ResponseCode == ResponseCode.StreamNotAvailable)
865873
{
866-
867874
ClientExceptions.MaybeThrowException(ResponseCode.StreamNotAvailable, stream);
868875
}
869876

@@ -889,6 +896,21 @@ public async ValueTask<DeleteResponse> DeleteStream(string stream)
889896
.ConfigureAwait(false);
890897
}
891898

899+
public async ValueTask<CreateSuperStreamResponse> CreateSuperStream(string superStream, List<string> partitions,
900+
List<string> bindingKeys, IDictionary<string, string> args)
901+
{
902+
return await Request<CreateSuperStreamRequest, CreateSuperStreamResponse>(corr =>
903+
new CreateSuperStreamRequest(corr, superStream, partitions, bindingKeys, args))
904+
.ConfigureAwait(false);
905+
}
906+
907+
public async ValueTask<DeleteSuperStreamResponse> DeleteSuperStream(string superStream)
908+
{
909+
return await Request<DeleteSuperStreamRequest, DeleteSuperStreamResponse>(corr =>
910+
new DeleteSuperStreamRequest(corr, superStream))
911+
.ConfigureAwait(false);
912+
}
913+
892914
public async ValueTask<bool> Credit(byte subscriptionId, ushort credit)
893915
{
894916
return await Publish(new CreditRequest(subscriptionId, credit)).ConfigureAwait(false);

RabbitMQ.Stream.Client/Consts.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ internal static class Consts
2323
internal const string FilterNotSupported = "Filtering is not supported by the broker "
2424
+ "(requires RabbitMQ 3.13+ and stream_filtering feature flag activated)";
2525

26+
internal const string SuperStreamCreationNotSupported = "SuperStreams creation / deleting not supported by the broker "
27+
+ "(requires RabbitMQ 3.13+. It is possible to use the command line tool to create superstreams)";
28+
2629
internal static int RandomShort()
2730
{
2831
return Random.Shared.Next(500, 1500);
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 2.0.
3+
// Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
4+
5+
using System;
6+
using System.Buffers;
7+
using System.Collections.Generic;
8+
using System.Linq;
9+
10+
namespace RabbitMQ.Stream.Client;
11+
12+
internal readonly struct CreateSuperStreamRequest : ICommand
13+
{
14+
private const ushort Key = 29;
15+
private readonly string _superStream;
16+
private readonly uint _corrId;
17+
private readonly IDictionary<string, string> _arguments;
18+
private readonly List<string> _partitions;
19+
private readonly List<string> _bindingKeys;
20+
21+
internal CreateSuperStreamRequest(uint corrId, string superStream,
22+
List<string> partitions, List<string> bindingKeys, IDictionary<string, string> args)
23+
{
24+
_corrId = corrId;
25+
_superStream = superStream;
26+
_partitions = partitions;
27+
_bindingKeys = bindingKeys;
28+
_arguments = args;
29+
}
30+
31+
public int SizeNeeded
32+
{
33+
get
34+
{
35+
var size =
36+
2
37+
+ 2
38+
+ 4
39+
+ WireFormatting.StringSize(_superStream);
40+
41+
size += 4 + _partitions.Sum(WireFormatting.StringSize);
42+
size += 4 + _bindingKeys.Sum(WireFormatting.StringSize);
43+
size += 4
44+
+ _arguments.Sum(x => WireFormatting.StringSize(x.Key) + WireFormatting.StringSize(x.Value));
45+
return size;
46+
}
47+
}
48+
49+
public int Write(Span<byte> span)
50+
{
51+
var command = (ICommand)this;
52+
var offset = WireFormatting.WriteUInt16(span, Key);
53+
offset += WireFormatting.WriteUInt16(span[offset..], command.Version);
54+
offset += WireFormatting.WriteUInt32(span[offset..], _corrId);
55+
offset += WireFormatting.WriteString(span[offset..], _superStream);
56+
offset += WireFormatting.WriteInt32(span[offset..], _partitions.Count);
57+
foreach (var partition in _partitions)
58+
{
59+
offset += WireFormatting.WriteString(span[offset..], partition);
60+
}
61+
62+
offset += WireFormatting.WriteInt32(span[offset..], _bindingKeys.Count);
63+
foreach (var bindingKey in _bindingKeys)
64+
{
65+
offset += WireFormatting.WriteString(span[offset..], bindingKey);
66+
}
67+
68+
offset += WireFormatting.WriteInt32(span[offset..], _arguments.Count);
69+
foreach (var (key, value) in _arguments)
70+
{
71+
offset += WireFormatting.WriteString(span[offset..], key);
72+
offset += WireFormatting.WriteString(span[offset..], value);
73+
}
74+
75+
return offset;
76+
}
77+
}
78+
79+
public readonly struct CreateSuperStreamResponse : ICommand
80+
{
81+
public const ushort Key = 29;
82+
private readonly uint _correlationId;
83+
private readonly ushort _responseCode;
84+
85+
private CreateSuperStreamResponse(uint correlationId, ushort responseCode)
86+
{
87+
_correlationId = correlationId;
88+
_responseCode = responseCode;
89+
}
90+
91+
public int SizeNeeded => throw new NotImplementedException();
92+
93+
public uint CorrelationId => _correlationId;
94+
95+
public ResponseCode ResponseCode => (ResponseCode)_responseCode;
96+
97+
public int Write(Span<byte> span)
98+
{
99+
throw new NotImplementedException();
100+
}
101+
102+
internal static int Read(ReadOnlySequence<byte> frame, out CreateSuperStreamResponse command)
103+
{
104+
var offset = WireFormatting.ReadUInt16(frame, out _);
105+
offset += WireFormatting.ReadUInt16(frame.Slice(offset), out _);
106+
offset += WireFormatting.ReadUInt32(frame.Slice(offset), out var correlation);
107+
offset += WireFormatting.ReadUInt16(frame.Slice(offset), out var responseCode);
108+
command = new CreateSuperStreamResponse(correlation, responseCode);
109+
return offset;
110+
}
111+
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 2.0.
3+
// Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
4+
5+
using System;
6+
using System.Buffers;
7+
8+
namespace RabbitMQ.Stream.Client;
9+
10+
internal readonly struct DeleteSuperStreamRequest : ICommand
11+
{
12+
private readonly uint _correlationId;
13+
private readonly string _superStream;
14+
private const ushort Key = 30;
15+
16+
public DeleteSuperStreamRequest(uint correlationId, string superStream)
17+
{
18+
_correlationId = correlationId;
19+
_superStream = superStream;
20+
}
21+
22+
public int SizeNeeded => 8 + WireFormatting.StringSize(_superStream);
23+
24+
public int Write(Span<byte> span)
25+
{
26+
var offset = WireFormatting.WriteUInt16(span, Key);
27+
offset += WireFormatting.WriteUInt16(span[offset..], ((ICommand)this).Version);
28+
offset += WireFormatting.WriteUInt32(span[offset..], _correlationId);
29+
offset += WireFormatting.WriteString(span[offset..], _superStream);
30+
return offset;
31+
}
32+
}
33+
34+
public readonly struct DeleteSuperStreamResponse : ICommand
35+
{
36+
public const ushort Key = 30;
37+
private readonly uint _correlationId;
38+
private readonly ushort _responseCode;
39+
40+
public DeleteSuperStreamResponse(uint correlationId, ushort responseCode)
41+
{
42+
_correlationId = correlationId;
43+
_responseCode = responseCode;
44+
}
45+
46+
public int SizeNeeded => throw new NotImplementedException();
47+
48+
public uint CorrelationId => _correlationId;
49+
50+
public ResponseCode ResponseCode => (ResponseCode)_responseCode;
51+
52+
public int Write(Span<byte> span)
53+
{
54+
throw new NotImplementedException();
55+
}
56+
57+
internal static int Read(ReadOnlySequence<byte> frame, out DeleteSuperStreamResponse command)
58+
{
59+
var offset = WireFormatting.ReadUInt16(frame, out _);
60+
offset += WireFormatting.ReadUInt16(frame.Slice(offset), out _);
61+
offset += WireFormatting.ReadUInt32(frame.Slice(offset), out var correlation);
62+
offset += WireFormatting.ReadUInt16(frame.Slice(offset), out var responseCode);
63+
command = new DeleteSuperStreamResponse(correlation, responseCode);
64+
return offset;
65+
}
66+
}

RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ abstract RabbitMQ.Stream.Client.AbstractEntity.DeleteEntityFromTheServer(bool ig
33
abstract RabbitMQ.Stream.Client.AbstractEntity.DumpEntityConfiguration() -> string
44
abstract RabbitMQ.Stream.Client.AbstractEntity.GetStream() -> string
55
abstract RabbitMQ.Stream.Client.Reliable.ReliableBase.CreateNewEntity(bool boot) -> System.Threading.Tasks.Task
6+
const RabbitMQ.Stream.Client.CreateSuperStreamResponse.Key = 29 -> ushort
7+
const RabbitMQ.Stream.Client.DeleteSuperStreamResponse.Key = 30 -> ushort
68
const RabbitMQ.Stream.Client.RouteQueryResponse.Key = 24 -> ushort
79
const RabbitMQ.Stream.Client.StreamStatsResponse.Key = 28 -> ushort
810
override RabbitMQ.Stream.Client.Broker.ToString() -> string
@@ -27,14 +29,19 @@ RabbitMQ.Stream.Client.AuthMechanism.External = 1 -> RabbitMQ.Stream.Client.Auth
2729
RabbitMQ.Stream.Client.AuthMechanism.Plain = 0 -> RabbitMQ.Stream.Client.AuthMechanism
2830
RabbitMQ.Stream.Client.AuthMechanismNotSupportedException
2931
RabbitMQ.Stream.Client.AuthMechanismNotSupportedException.AuthMechanismNotSupportedException(string s) -> void
32+
RabbitMQ.Stream.Client.BindingsSuperStreamSpec
33+
RabbitMQ.Stream.Client.BindingsSuperStreamSpec.BindingKeys.get -> string[]
34+
RabbitMQ.Stream.Client.BindingsSuperStreamSpec.BindingsSuperStreamSpec(string Name, string[] bindingKeys) -> void
3035
RabbitMQ.Stream.Client.Chunk.Crc.get -> uint
3136
RabbitMQ.Stream.Client.Chunk.Data.get -> System.ReadOnlyMemory<byte>
3237
RabbitMQ.Stream.Client.Chunk.MagicVersion.get -> byte
3338
RabbitMQ.Stream.Client.Client.ClientId.get -> string
3439
RabbitMQ.Stream.Client.Client.ClientId.init -> void
3540
RabbitMQ.Stream.Client.Client.Consumers.get -> System.Collections.Generic.IDictionary<byte, (string, RabbitMQ.Stream.Client.ConsumerEvents)>
41+
RabbitMQ.Stream.Client.Client.CreateSuperStream(string superStream, System.Collections.Generic.List<string> partitions, System.Collections.Generic.List<string> bindingKeys, System.Collections.Generic.IDictionary<string, string> args) -> System.Threading.Tasks.ValueTask<RabbitMQ.Stream.Client.CreateSuperStreamResponse>
3642
RabbitMQ.Stream.Client.Client.DeclarePublisher(string publisherRef, string stream, System.Action<System.ReadOnlyMemory<ulong>> confirmCallback, System.Action<(ulong, RabbitMQ.Stream.Client.ResponseCode)[]> errorCallback, RabbitMQ.Stream.Client.ConnectionsPool pool = null) -> System.Threading.Tasks.Task<(byte, RabbitMQ.Stream.Client.DeclarePublisherResponse)>
3743
RabbitMQ.Stream.Client.Client.DeletePublisher(byte publisherId, bool ignoreIfAlreadyRemoved = false) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.DeletePublisherResponse>
44+
RabbitMQ.Stream.Client.Client.DeleteSuperStream(string superStream) -> System.Threading.Tasks.ValueTask<RabbitMQ.Stream.Client.DeleteSuperStreamResponse>
3845
RabbitMQ.Stream.Client.Client.ExchangeVersions() -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.CommandVersionsResponse>
3946
RabbitMQ.Stream.Client.Client.Publishers.get -> System.Collections.Generic.IDictionary<byte, (string, (System.Action<System.ReadOnlyMemory<ulong>>, System.Action<(ulong, RabbitMQ.Stream.Client.ResponseCode)[]>))>
4047
RabbitMQ.Stream.Client.Client.QueryRoute(string superStream, string routingKey) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.RouteQueryResponse>
@@ -94,6 +101,19 @@ RabbitMQ.Stream.Client.CreateException.CreateException(string s, RabbitMQ.Stream
94101
RabbitMQ.Stream.Client.CreateException.ResponseCode.get -> RabbitMQ.Stream.Client.ResponseCode
95102
RabbitMQ.Stream.Client.CreateException.ResponseCode.init -> void
96103
RabbitMQ.Stream.Client.CreateProducerException.CreateProducerException(string s, RabbitMQ.Stream.Client.ResponseCode responseCode) -> void
104+
RabbitMQ.Stream.Client.CreateSuperStreamResponse
105+
RabbitMQ.Stream.Client.CreateSuperStreamResponse.CorrelationId.get -> uint
106+
RabbitMQ.Stream.Client.CreateSuperStreamResponse.CreateSuperStreamResponse() -> void
107+
RabbitMQ.Stream.Client.CreateSuperStreamResponse.ResponseCode.get -> RabbitMQ.Stream.Client.ResponseCode
108+
RabbitMQ.Stream.Client.CreateSuperStreamResponse.SizeNeeded.get -> int
109+
RabbitMQ.Stream.Client.CreateSuperStreamResponse.Write(System.Span<byte> span) -> int
110+
RabbitMQ.Stream.Client.DeleteSuperStreamResponse
111+
RabbitMQ.Stream.Client.DeleteSuperStreamResponse.CorrelationId.get -> uint
112+
RabbitMQ.Stream.Client.DeleteSuperStreamResponse.DeleteSuperStreamResponse() -> void
113+
RabbitMQ.Stream.Client.DeleteSuperStreamResponse.DeleteSuperStreamResponse(uint correlationId, ushort responseCode) -> void
114+
RabbitMQ.Stream.Client.DeleteSuperStreamResponse.ResponseCode.get -> RabbitMQ.Stream.Client.ResponseCode
115+
RabbitMQ.Stream.Client.DeleteSuperStreamResponse.SizeNeeded.get -> int
116+
RabbitMQ.Stream.Client.DeleteSuperStreamResponse.Write(System.Span<byte> span) -> int
97117
RabbitMQ.Stream.Client.EntityCommonConfig
98118
RabbitMQ.Stream.Client.EntityCommonConfig.Identifier.get -> string
99119
RabbitMQ.Stream.Client.EntityCommonConfig.Identifier.set -> void
@@ -160,6 +180,9 @@ RabbitMQ.Stream.Client.MessageContext.ChunkMessagesCount.get -> uint
160180
RabbitMQ.Stream.Client.MessageContext.MessageContext(ulong offset, System.TimeSpan timestamp, uint chunkMessagesCount, ulong chunkId) -> void
161181
RabbitMQ.Stream.Client.OffsetTypeTimestamp.OffsetTypeTimestamp(System.DateTime dateTime) -> void
162182
RabbitMQ.Stream.Client.OffsetTypeTimestamp.OffsetTypeTimestamp(System.DateTimeOffset dateTimeOffset) -> void
183+
RabbitMQ.Stream.Client.PartitionsSuperStreamSpec
184+
RabbitMQ.Stream.Client.PartitionsSuperStreamSpec.Partitions.get -> int
185+
RabbitMQ.Stream.Client.PartitionsSuperStreamSpec.PartitionsSuperStreamSpec(string Name, int partitions) -> void
163186
RabbitMQ.Stream.Client.ProducerFilter
164187
RabbitMQ.Stream.Client.ProducerFilter.FilterValue.get -> System.Func<RabbitMQ.Stream.Client.Message, string>
165188
RabbitMQ.Stream.Client.ProducerFilter.FilterValue.set -> void
@@ -281,14 +304,25 @@ RabbitMQ.Stream.Client.StreamStatsResponse.StreamStatsResponse() -> void
281304
RabbitMQ.Stream.Client.StreamStatsResponse.StreamStatsResponse(uint correlationId, RabbitMQ.Stream.Client.ResponseCode responseCode, System.Collections.Generic.IDictionary<string, long> statistic) -> void
282305
RabbitMQ.Stream.Client.StreamStatsResponse.Write(System.Span<byte> span) -> int
283306
RabbitMQ.Stream.Client.StreamSystem.CreateRawSuperStreamProducer(RabbitMQ.Stream.Client.RawSuperStreamProducerConfig rawSuperStreamProducerConfig, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.ISuperStreamProducer>
307+
RabbitMQ.Stream.Client.StreamSystem.CreateSuperStream(RabbitMQ.Stream.Client.SuperStreamSpec spec) -> System.Threading.Tasks.Task
284308
RabbitMQ.Stream.Client.StreamSystem.CreateSuperStreamConsumer(RabbitMQ.Stream.Client.RawSuperStreamConsumerConfig rawSuperStreamConsumerConfig, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.ISuperStreamConsumer>
309+
RabbitMQ.Stream.Client.StreamSystem.DeleteSuperStream(string superStream) -> System.Threading.Tasks.Task
285310
RabbitMQ.Stream.Client.StreamSystem.StreamInfo(string streamName) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.StreamInfo>
286311
RabbitMQ.Stream.Client.StreamSystem.StreamStats(string stream) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.StreamStats>
287312
RabbitMQ.Stream.Client.StreamSystem.UpdateSecret(string newSecret) -> System.Threading.Tasks.Task
288313
RabbitMQ.Stream.Client.StreamSystemConfig.AuthMechanism.get -> RabbitMQ.Stream.Client.AuthMechanism
289314
RabbitMQ.Stream.Client.StreamSystemConfig.AuthMechanism.set -> void
290315
RabbitMQ.Stream.Client.StreamSystemConfig.ConnectionPoolConfig.get -> RabbitMQ.Stream.Client.ConnectionPoolConfig
291316
RabbitMQ.Stream.Client.StreamSystemConfig.ConnectionPoolConfig.set -> void
317+
RabbitMQ.Stream.Client.SuperStreamSpec
318+
RabbitMQ.Stream.Client.SuperStreamSpec.Args.get -> System.Collections.Generic.IDictionary<string, string>
319+
RabbitMQ.Stream.Client.SuperStreamSpec.LeaderLocator.set -> void
320+
RabbitMQ.Stream.Client.SuperStreamSpec.MaxAge.set -> void
321+
RabbitMQ.Stream.Client.SuperStreamSpec.MaxLengthBytes.set -> void
322+
RabbitMQ.Stream.Client.SuperStreamSpec.MaxSegmentSizeBytes.set -> void
323+
RabbitMQ.Stream.Client.SuperStreamSpec.Name.get -> string
324+
RabbitMQ.Stream.Client.SuperStreamSpec.Name.init -> void
325+
RabbitMQ.Stream.Client.SuperStreamSpec.SuperStreamSpec(string Name) -> void
292326
RabbitMQ.Stream.Client.TooManyConnectionsException
293327
RabbitMQ.Stream.Client.TooManyConnectionsException.TooManyConnectionsException(string s) -> void
294328
RabbitMQ.Stream.Client.UnknownCommandException

0 commit comments

Comments
 (0)