Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion RabbitMQ.Stream.Client/AddressResolver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

namespace RabbitMQ.Stream.Client
{
public class AddressResolver
public class AddressResolver : IAddressResolver
{
public AddressResolver(EndPoint endPoint)
{
Expand All @@ -16,5 +16,6 @@ public AddressResolver(EndPoint endPoint)

public EndPoint EndPoint { get; set; }
public bool Enabled { get; set; }
public EndPoint Resolve(string address, int host) => EndPoint;
}
}
22 changes: 22 additions & 0 deletions RabbitMQ.Stream.Client/AddressResolverDynamic.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// This source code is dual-licensed under the Apache License, version
// 2.0, and the Mozilla Public License, version 2.0.
// Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.

using System;
using System.Net;

namespace RabbitMQ.Stream.Client;

public class AddressResolverDynamic : IAddressResolver
{
private readonly Func<string, int, EndPoint> _resolveFunction;

public AddressResolverDynamic(Func<string, int, EndPoint> resolveFunction)
{
_resolveFunction = resolveFunction;
Enabled = true;
}

public bool Enabled { get; set; }
public EndPoint Resolve(string address, int host) => _resolveFunction(address, host);
}
2 changes: 1 addition & 1 deletion RabbitMQ.Stream.Client/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public string ClientProvidedName
/// </summary>
public SslOption Ssl { get; set; } = new SslOption();

public AddressResolver AddressResolver { get; set; } = null;
public IAddressResolver AddressResolver { get; set; } = null;

public AuthMechanism AuthMechanism { get; set; } = AuthMechanism.Plain;

Expand Down
13 changes: 13 additions & 0 deletions RabbitMQ.Stream.Client/IAddressResolver.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// This source code is dual-licensed under the Apache License, version
// 2.0, and the Mozilla Public License, version 2.0.
// Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.

using System.Net;

namespace RabbitMQ.Stream.Client;

public interface IAddressResolver
{
public bool Enabled { get; }
public EndPoint Resolve(string address, int host);
}
10 changes: 1 addition & 9 deletions RabbitMQ.Stream.Client/PublicAPI.Shipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,6 @@ override RabbitMQ.Stream.Client.Reliable.Producer.Close() -> System.Threading.Ta
override RabbitMQ.Stream.Client.Reliable.Producer.ToString() -> string
RabbitMQ.Stream.Client.AbstractEntity
RabbitMQ.Stream.Client.AbstractEntity.AbstractEntity() -> void
RabbitMQ.Stream.Client.AddressResolver
RabbitMQ.Stream.Client.AddressResolver.AddressResolver(System.Net.EndPoint endPoint) -> void
RabbitMQ.Stream.Client.AddressResolver.Enabled.get -> bool
RabbitMQ.Stream.Client.AddressResolver.Enabled.set -> void
RabbitMQ.Stream.Client.AddressResolver.EndPoint.get -> System.Net.EndPoint
RabbitMQ.Stream.Client.AddressResolver.EndPoint.set -> void
RabbitMQ.Stream.Client.AMQP.AmqpParseException
RabbitMQ.Stream.Client.AMQP.AmqpParseException.AmqpParseException(string s) -> void
RabbitMQ.Stream.Client.AMQP.AmqpWireFormatting
Expand Down Expand Up @@ -187,8 +181,6 @@ RabbitMQ.Stream.Client.Client.StoreOffset(string reference, string stream, ulong
RabbitMQ.Stream.Client.Client.StreamExists(string stream) -> System.Threading.Tasks.Task<bool>
RabbitMQ.Stream.Client.Client.Subscribe(RabbitMQ.Stream.Client.RawConsumerConfig config, ushort initialCredit, System.Collections.Generic.Dictionary<string, string> properties, System.Func<RabbitMQ.Stream.Client.Deliver, System.Threading.Tasks.Task> deliverHandler, System.Func<bool, System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IOffsetType>> consumerUpdateHandler) -> System.Threading.Tasks.Task<(byte, RabbitMQ.Stream.Client.SubscribeResponse)>
RabbitMQ.Stream.Client.ClientParameters
RabbitMQ.Stream.Client.ClientParameters.AddressResolver.get -> RabbitMQ.Stream.Client.AddressResolver
RabbitMQ.Stream.Client.ClientParameters.AddressResolver.set -> void
RabbitMQ.Stream.Client.ClientParameters.ClientProvidedName.get -> string
RabbitMQ.Stream.Client.ClientParameters.ClientProvidedName.set -> void
RabbitMQ.Stream.Client.ClientParameters.Endpoint.get -> System.Net.EndPoint
Expand Down Expand Up @@ -724,7 +716,7 @@ RabbitMQ.Stream.Client.StreamSystem.QueryPartition(string superStream) -> System
RabbitMQ.Stream.Client.StreamSystem.QuerySequence(string reference, string stream) -> System.Threading.Tasks.Task<ulong>
RabbitMQ.Stream.Client.StreamSystem.StreamExists(string stream) -> System.Threading.Tasks.Task<bool>
RabbitMQ.Stream.Client.StreamSystemConfig
RabbitMQ.Stream.Client.StreamSystemConfig.AddressResolver.get -> RabbitMQ.Stream.Client.AddressResolver
RabbitMQ.Stream.Client.StreamSystemConfig.AddressResolver.get -> RabbitMQ.Stream.Client.IAddressResolver
RabbitMQ.Stream.Client.StreamSystemConfig.AddressResolver.set -> void
RabbitMQ.Stream.Client.StreamSystemConfig.ClientProvidedName.get -> string
RabbitMQ.Stream.Client.StreamSystemConfig.ClientProvidedName.set -> void
Expand Down
17 changes: 17 additions & 0 deletions RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,18 @@ RabbitMQ.Stream.Client.AbstractEntity.Shutdown(RabbitMQ.Stream.Client.EntityComm
RabbitMQ.Stream.Client.AbstractEntity.ThrowIfClosed() -> void
RabbitMQ.Stream.Client.AbstractEntity.Token.get -> System.Threading.CancellationToken
RabbitMQ.Stream.Client.AbstractEntity.UpdateStatusToClosed() -> void
RabbitMQ.Stream.Client.AddressResolver
RabbitMQ.Stream.Client.AddressResolver.AddressResolver(System.Net.EndPoint endPoint) -> void
RabbitMQ.Stream.Client.AddressResolver.Enabled.get -> bool
RabbitMQ.Stream.Client.AddressResolver.Enabled.set -> void
RabbitMQ.Stream.Client.AddressResolver.EndPoint.get -> System.Net.EndPoint
RabbitMQ.Stream.Client.AddressResolver.EndPoint.set -> void
RabbitMQ.Stream.Client.AddressResolver.Resolve(string address, int host) -> System.Net.EndPoint
RabbitMQ.Stream.Client.AddressResolverDynamic
RabbitMQ.Stream.Client.AddressResolverDynamic.AddressResolverDynamic(System.Func<string, int, System.Net.EndPoint> resolveFunction) -> void
RabbitMQ.Stream.Client.AddressResolverDynamic.Enabled.get -> bool
RabbitMQ.Stream.Client.AddressResolverDynamic.Enabled.set -> void
RabbitMQ.Stream.Client.AddressResolverDynamic.Resolve(string address, int host) -> System.Net.EndPoint
RabbitMQ.Stream.Client.AlreadyClosedException
RabbitMQ.Stream.Client.AlreadyClosedException.AlreadyClosedException(string s) -> void
RabbitMQ.Stream.Client.AuthMechanism
Expand Down Expand Up @@ -53,6 +65,8 @@ RabbitMQ.Stream.Client.Client.Subscribe(string stream, RabbitMQ.Stream.Client.IO
RabbitMQ.Stream.Client.Client.SuperStreamExists(string stream) -> System.Threading.Tasks.Task<bool>
RabbitMQ.Stream.Client.Client.Unsubscribe(byte subscriptionId, bool ignoreIfAlreadyRemoved = false) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.UnsubscribeResponse>
RabbitMQ.Stream.Client.Client.UpdateSecret(string newSecret) -> System.Threading.Tasks.Task
RabbitMQ.Stream.Client.ClientParameters.AddressResolver.get -> RabbitMQ.Stream.Client.IAddressResolver
RabbitMQ.Stream.Client.ClientParameters.AddressResolver.set -> void
RabbitMQ.Stream.Client.ClientParameters.AuthMechanism.get -> RabbitMQ.Stream.Client.AuthMechanism
RabbitMQ.Stream.Client.ClientParameters.AuthMechanism.set -> void
RabbitMQ.Stream.Client.ClientParameters.MetadataUpdateHandler
Expand Down Expand Up @@ -147,6 +161,9 @@ RabbitMQ.Stream.Client.FlowControl.Strategy.get -> RabbitMQ.Stream.Client.Consum
RabbitMQ.Stream.Client.FlowControl.Strategy.set -> void
RabbitMQ.Stream.Client.HashRoutingMurmurStrategy.Route(RabbitMQ.Stream.Client.Message message, System.Collections.Generic.List<string> partitions) -> System.Threading.Tasks.Task<System.Collections.Generic.List<string>>
RabbitMQ.Stream.Client.HeartBeatHandler.HeartBeatHandler(System.Func<System.Threading.Tasks.ValueTask<bool>> sendHeartbeatFunc, System.Func<string, string, System.Threading.Tasks.Task<RabbitMQ.Stream.Client.CloseResponse>> close, int heartbeat, Microsoft.Extensions.Logging.ILogger<RabbitMQ.Stream.Client.HeartBeatHandler> logger = null) -> void
RabbitMQ.Stream.Client.IAddressResolver
RabbitMQ.Stream.Client.IAddressResolver.Enabled.get -> bool
RabbitMQ.Stream.Client.IAddressResolver.Resolve(string address, int host) -> System.Net.EndPoint
RabbitMQ.Stream.Client.IClient.ClientId.get -> string
RabbitMQ.Stream.Client.IClient.ClientId.init -> void
RabbitMQ.Stream.Client.IClient.Consumers.get -> System.Collections.Generic.IDictionary<byte, (string, RabbitMQ.Stream.Client.ConsumerEvents)>
Expand Down
2 changes: 1 addition & 1 deletion RabbitMQ.Stream.Client/RoutingClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ clientParameters with
// here it means that there is a AddressResolver configuration
// so there is a load-balancer or proxy we need to get the right connection
// as first we try with the first node given from the LB
var endPoint = clientParameters.AddressResolver.EndPoint;
var endPoint = clientParameters.AddressResolver.Resolve(broker.Host, (int)broker.Port);
var client = await routing
.CreateClient(
clientParameters with
Expand Down
2 changes: 1 addition & 1 deletion RabbitMQ.Stream.Client/StreamSystem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ internal void Validate()
public IList<EndPoint> Endpoints { get; set; } =
new List<EndPoint> { new IPEndPoint(IPAddress.Loopback, 5552) };

public AddressResolver AddressResolver { get; set; }
public IAddressResolver AddressResolver { get; set; }
public string ClientProvidedName { get; set; } = "dotnet-stream-locator";

public AuthMechanism AuthMechanism { get; set; } = AuthMechanism.Plain;
Expand Down
Loading