Skip to content

Commit e5445ef

Browse files
Resolve address via a function (#431)
* add IAdressResolver and add a new implementation that can dynamically return an Endpoint * set Endpoint attribute back to public to not break existing usage --------- Co-authored-by: Gabriele Santomaggio <[email protected]>
1 parent c75f4d5 commit e5445ef

File tree

8 files changed

+58
-13
lines changed

8 files changed

+58
-13
lines changed

RabbitMQ.Stream.Client/AddressResolver.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
namespace RabbitMQ.Stream.Client
88
{
9-
public class AddressResolver
9+
public class AddressResolver : IAddressResolver
1010
{
1111
public AddressResolver(EndPoint endPoint)
1212
{
@@ -16,5 +16,6 @@ public AddressResolver(EndPoint endPoint)
1616

1717
public EndPoint EndPoint { get; set; }
1818
public bool Enabled { get; set; }
19+
public EndPoint Resolve(string address, int host) => EndPoint;
1920
}
2021
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 2.0.
3+
// Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
4+
5+
using System;
6+
using System.Net;
7+
8+
namespace RabbitMQ.Stream.Client;
9+
10+
public class AddressResolverDynamic : IAddressResolver
11+
{
12+
private readonly Func<string, int, EndPoint> _resolveFunction;
13+
14+
public AddressResolverDynamic(Func<string, int, EndPoint> resolveFunction)
15+
{
16+
_resolveFunction = resolveFunction;
17+
Enabled = true;
18+
}
19+
20+
public bool Enabled { get; set; }
21+
public EndPoint Resolve(string address, int host) => _resolveFunction(address, host);
22+
}

RabbitMQ.Stream.Client/Client.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public string ClientProvidedName
7272
/// </summary>
7373
public SslOption Ssl { get; set; } = new SslOption();
7474

75-
public AddressResolver AddressResolver { get; set; } = null;
75+
public IAddressResolver AddressResolver { get; set; } = null;
7676

7777
public AuthMechanism AuthMechanism { get; set; } = AuthMechanism.Plain;
7878

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 2.0.
3+
// Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
4+
5+
using System.Net;
6+
7+
namespace RabbitMQ.Stream.Client;
8+
9+
public interface IAddressResolver
10+
{
11+
public bool Enabled { get; }
12+
public EndPoint Resolve(string address, int host);
13+
}

RabbitMQ.Stream.Client/PublicAPI.Shipped.txt

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -79,12 +79,6 @@ override RabbitMQ.Stream.Client.Reliable.Producer.Close() -> System.Threading.Ta
7979
override RabbitMQ.Stream.Client.Reliable.Producer.ToString() -> string
8080
RabbitMQ.Stream.Client.AbstractEntity
8181
RabbitMQ.Stream.Client.AbstractEntity.AbstractEntity() -> void
82-
RabbitMQ.Stream.Client.AddressResolver
83-
RabbitMQ.Stream.Client.AddressResolver.AddressResolver(System.Net.EndPoint endPoint) -> void
84-
RabbitMQ.Stream.Client.AddressResolver.Enabled.get -> bool
85-
RabbitMQ.Stream.Client.AddressResolver.Enabled.set -> void
86-
RabbitMQ.Stream.Client.AddressResolver.EndPoint.get -> System.Net.EndPoint
87-
RabbitMQ.Stream.Client.AddressResolver.EndPoint.set -> void
8882
RabbitMQ.Stream.Client.AMQP.AmqpParseException
8983
RabbitMQ.Stream.Client.AMQP.AmqpParseException.AmqpParseException(string s) -> void
9084
RabbitMQ.Stream.Client.AMQP.AmqpWireFormatting
@@ -187,8 +181,6 @@ RabbitMQ.Stream.Client.Client.StoreOffset(string reference, string stream, ulong
187181
RabbitMQ.Stream.Client.Client.StreamExists(string stream) -> System.Threading.Tasks.Task<bool>
188182
RabbitMQ.Stream.Client.Client.Subscribe(RabbitMQ.Stream.Client.RawConsumerConfig config, ushort initialCredit, System.Collections.Generic.Dictionary<string, string> properties, System.Func<RabbitMQ.Stream.Client.Deliver, System.Threading.Tasks.Task> deliverHandler, System.Func<bool, System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IOffsetType>> consumerUpdateHandler) -> System.Threading.Tasks.Task<(byte, RabbitMQ.Stream.Client.SubscribeResponse)>
189183
RabbitMQ.Stream.Client.ClientParameters
190-
RabbitMQ.Stream.Client.ClientParameters.AddressResolver.get -> RabbitMQ.Stream.Client.AddressResolver
191-
RabbitMQ.Stream.Client.ClientParameters.AddressResolver.set -> void
192184
RabbitMQ.Stream.Client.ClientParameters.ClientProvidedName.get -> string
193185
RabbitMQ.Stream.Client.ClientParameters.ClientProvidedName.set -> void
194186
RabbitMQ.Stream.Client.ClientParameters.Endpoint.get -> System.Net.EndPoint
@@ -724,7 +716,7 @@ RabbitMQ.Stream.Client.StreamSystem.QueryPartition(string superStream) -> System
724716
RabbitMQ.Stream.Client.StreamSystem.QuerySequence(string reference, string stream) -> System.Threading.Tasks.Task<ulong>
725717
RabbitMQ.Stream.Client.StreamSystem.StreamExists(string stream) -> System.Threading.Tasks.Task<bool>
726718
RabbitMQ.Stream.Client.StreamSystemConfig
727-
RabbitMQ.Stream.Client.StreamSystemConfig.AddressResolver.get -> RabbitMQ.Stream.Client.AddressResolver
719+
RabbitMQ.Stream.Client.StreamSystemConfig.AddressResolver.get -> RabbitMQ.Stream.Client.IAddressResolver
728720
RabbitMQ.Stream.Client.StreamSystemConfig.AddressResolver.set -> void
729721
RabbitMQ.Stream.Client.StreamSystemConfig.ClientProvidedName.get -> string
730722
RabbitMQ.Stream.Client.StreamSystemConfig.ClientProvidedName.set -> void

RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,18 @@ RabbitMQ.Stream.Client.AbstractEntity.Shutdown(RabbitMQ.Stream.Client.EntityComm
2222
RabbitMQ.Stream.Client.AbstractEntity.ThrowIfClosed() -> void
2323
RabbitMQ.Stream.Client.AbstractEntity.Token.get -> System.Threading.CancellationToken
2424
RabbitMQ.Stream.Client.AbstractEntity.UpdateStatusToClosed() -> void
25+
RabbitMQ.Stream.Client.AddressResolver
26+
RabbitMQ.Stream.Client.AddressResolver.AddressResolver(System.Net.EndPoint endPoint) -> void
27+
RabbitMQ.Stream.Client.AddressResolver.Enabled.get -> bool
28+
RabbitMQ.Stream.Client.AddressResolver.Enabled.set -> void
29+
RabbitMQ.Stream.Client.AddressResolver.EndPoint.get -> System.Net.EndPoint
30+
RabbitMQ.Stream.Client.AddressResolver.EndPoint.set -> void
31+
RabbitMQ.Stream.Client.AddressResolver.Resolve(string address, int host) -> System.Net.EndPoint
32+
RabbitMQ.Stream.Client.AddressResolverDynamic
33+
RabbitMQ.Stream.Client.AddressResolverDynamic.AddressResolverDynamic(System.Func<string, int, System.Net.EndPoint> resolveFunction) -> void
34+
RabbitMQ.Stream.Client.AddressResolverDynamic.Enabled.get -> bool
35+
RabbitMQ.Stream.Client.AddressResolverDynamic.Enabled.set -> void
36+
RabbitMQ.Stream.Client.AddressResolverDynamic.Resolve(string address, int host) -> System.Net.EndPoint
2537
RabbitMQ.Stream.Client.AlreadyClosedException
2638
RabbitMQ.Stream.Client.AlreadyClosedException.AlreadyClosedException(string s) -> void
2739
RabbitMQ.Stream.Client.AuthMechanism
@@ -53,6 +65,8 @@ RabbitMQ.Stream.Client.Client.Subscribe(string stream, RabbitMQ.Stream.Client.IO
5365
RabbitMQ.Stream.Client.Client.SuperStreamExists(string stream) -> System.Threading.Tasks.Task<bool>
5466
RabbitMQ.Stream.Client.Client.Unsubscribe(byte subscriptionId, bool ignoreIfAlreadyRemoved = false) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.UnsubscribeResponse>
5567
RabbitMQ.Stream.Client.Client.UpdateSecret(string newSecret) -> System.Threading.Tasks.Task
68+
RabbitMQ.Stream.Client.ClientParameters.AddressResolver.get -> RabbitMQ.Stream.Client.IAddressResolver
69+
RabbitMQ.Stream.Client.ClientParameters.AddressResolver.set -> void
5670
RabbitMQ.Stream.Client.ClientParameters.AuthMechanism.get -> RabbitMQ.Stream.Client.AuthMechanism
5771
RabbitMQ.Stream.Client.ClientParameters.AuthMechanism.set -> void
5872
RabbitMQ.Stream.Client.ClientParameters.MetadataUpdateHandler
@@ -147,6 +161,9 @@ RabbitMQ.Stream.Client.FlowControl.Strategy.get -> RabbitMQ.Stream.Client.Consum
147161
RabbitMQ.Stream.Client.FlowControl.Strategy.set -> void
148162
RabbitMQ.Stream.Client.HashRoutingMurmurStrategy.Route(RabbitMQ.Stream.Client.Message message, System.Collections.Generic.List<string> partitions) -> System.Threading.Tasks.Task<System.Collections.Generic.List<string>>
149163
RabbitMQ.Stream.Client.HeartBeatHandler.HeartBeatHandler(System.Func<System.Threading.Tasks.ValueTask<bool>> sendHeartbeatFunc, System.Func<string, string, System.Threading.Tasks.Task<RabbitMQ.Stream.Client.CloseResponse>> close, int heartbeat, Microsoft.Extensions.Logging.ILogger<RabbitMQ.Stream.Client.HeartBeatHandler> logger = null) -> void
164+
RabbitMQ.Stream.Client.IAddressResolver
165+
RabbitMQ.Stream.Client.IAddressResolver.Enabled.get -> bool
166+
RabbitMQ.Stream.Client.IAddressResolver.Resolve(string address, int host) -> System.Net.EndPoint
150167
RabbitMQ.Stream.Client.IClient.ClientId.get -> string
151168
RabbitMQ.Stream.Client.IClient.ClientId.init -> void
152169
RabbitMQ.Stream.Client.IClient.Consumers.get -> System.Collections.Generic.IDictionary<byte, (string, RabbitMQ.Stream.Client.ConsumerEvents)>

RabbitMQ.Stream.Client/RoutingClient.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ clientParameters with
8080
// here it means that there is a AddressResolver configuration
8181
// so there is a load-balancer or proxy we need to get the right connection
8282
// as first we try with the first node given from the LB
83-
var endPoint = clientParameters.AddressResolver.EndPoint;
83+
var endPoint = clientParameters.AddressResolver.Resolve(broker.Host, (int)broker.Port);
8484
var client = await routing
8585
.CreateClient(
8686
clientParameters with

RabbitMQ.Stream.Client/StreamSystem.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ internal void Validate()
4040
public IList<EndPoint> Endpoints { get; set; } =
4141
new List<EndPoint> { new IPEndPoint(IPAddress.Loopback, 5552) };
4242

43-
public AddressResolver AddressResolver { get; set; }
43+
public IAddressResolver AddressResolver { get; set; }
4444
public string ClientProvidedName { get; set; } = "dotnet-stream-locator";
4545

4646
public AuthMechanism AuthMechanism { get; set; } = AuthMechanism.Plain;

0 commit comments

Comments
 (0)