diff --git a/RabbitMQ.Stream.Client/ClientExceptions.cs b/RabbitMQ.Stream.Client/ClientExceptions.cs index 95e2db59..48947522 100644 --- a/RabbitMQ.Stream.Client/ClientExceptions.cs +++ b/RabbitMQ.Stream.Client/ClientExceptions.cs @@ -78,13 +78,7 @@ public static void MaybeThrowException(ResponseCode responseCode, string message } } - public class AlreadyClosedException : Exception - { - public AlreadyClosedException(string s) - : base(s) - { - } - } + public class AlreadyClosedException(string s) : Exception(s); public class ProtocolException : Exception { @@ -94,102 +88,31 @@ protected ProtocolException(string s) } } - public class LeaderNotFoundException : ProtocolException - { - public LeaderNotFoundException(string s) - : base(s) - { - } - } + public class LeaderNotFoundException(string s) : ProtocolException(s); - public class GenericProtocolException : ProtocolException - { - public GenericProtocolException(ResponseCode responseCode, string s) - : base($"response code: {responseCode} - {s}") - { - } - } + public class GenericProtocolException(ResponseCode responseCode, string s) + : ProtocolException($"response code: {responseCode} - {s}"); - public class AuthenticationFailureException : ProtocolException - { - public AuthenticationFailureException(string s) - : base(s) - { - } - } + public class AuthenticationFailureException(string s) : ProtocolException(s); - public class AuthenticationFailureLoopback : ProtocolException - { - public AuthenticationFailureLoopback(string s) - : base(s) - { - } - } + public class AuthenticationFailureLoopback(string s) : ProtocolException(s); - public class VirtualHostAccessFailureException : ProtocolException - { - public VirtualHostAccessFailureException(string s) - : base(s) - { - } - } + public class VirtualHostAccessFailureException(string s) : ProtocolException(s); - public class OffsetNotFoundException : ProtocolException - { - public OffsetNotFoundException(string s) - : base(s) - { - } - } + public class OffsetNotFoundException(string s) : ProtocolException(s); // RouteNotFoundException the exception for super stream publish // RouteNotFoundException is raised when the message can't be routed to any stream. // In this case the user will receive a timeout error and this exception is raised - public class RouteNotFoundException : ProtocolException - { - public RouteNotFoundException(string s) - : base(s) - { - } - } + public class RouteNotFoundException(string s) : ProtocolException(s); - public class AuthMechanismNotSupportedException : Exception - { - public AuthMechanismNotSupportedException(string s) - : base(s) - { - } - } + public class AuthMechanismNotSupportedException(string s) : Exception(s); - public class UnsupportedOperationException : Exception - { - public UnsupportedOperationException(string s) - : base(s) - { - } - } + public class UnsupportedOperationException(string s) : Exception(s); - public class UnknownCommandException : Exception - { - public UnknownCommandException(string s) - : base(s) - { - } - } + public class UnknownCommandException(string s) : Exception(s); - public class TooManyConnectionsException : Exception - { - public TooManyConnectionsException(string s) - : base(s) - { - } - } + public class TooManyConnectionsException(string s) : Exception(s); - public class PendingConnectionsException : Exception - { - public PendingConnectionsException(string s) - : base(s) - { - } - } + public class PendingConnectionsException(string s) : Exception(s); } diff --git a/RabbitMQ.Stream.Client/ConnectionsPool.cs b/RabbitMQ.Stream.Client/ConnectionsPool.cs index 0a37b535..70fd5a79 100644 --- a/RabbitMQ.Stream.Client/ConnectionsPool.cs +++ b/RabbitMQ.Stream.Client/ConnectionsPool.cs @@ -136,7 +136,7 @@ internal static byte FindNextValidId(List ids, byte nextId = 0) { lock (s_lock) { - // // we start with the recycle when we reach the max value + var originalList = ids.ToList(); // // in this way we can avoid to recycle the same ids in a short time ids.Sort(); var l = ids.Where(b => b >= nextId).ToList(); @@ -152,15 +152,17 @@ internal static byte FindNextValidId(List ids, byte nextId = 0) if (l[^1] != byte.MaxValue) return (byte)(l[^1] + 1); - for (var i = 0; i < ids.Count - 1; i++) + // let's try to find a free id in the list + originalList.Reverse(); + for (byte i = 0; i < byte.MaxValue; i++) { - if (l[i + 1] - l[i] > 1) + if (!originalList.Contains(i)) { - return (byte)(l[i] + 1); + return i; } } - return (byte)(l[^1] + 1); + throw new InvalidOperationException("No more available ids"); } } @@ -237,28 +239,23 @@ internal async Task GetOrCreateClient(string brokerInfo, Func x.BrokerInfo == brokerInfo && x.Available).ToList(); if (connectionItems.Any()) { - // ok we have a connection available for this brokerInfo - // let's get the first one var connectionItem = connectionItems.OrderBy(x => x.EntitiesCount).First(); connectionItem.LastUsed = DateTime.UtcNow; if (connectionItem.Client is not { IsClosed: true }) return connectionItem.Client; - // the connection is closed - // let's remove it from the pool + // remove closed connection Connections.TryRemove(connectionItem.Client.ClientId, out _); - // let's create a new one + + // create and add new connection item with the new client's id var newConnectionItem = new ConnectionItem(brokerInfo, _idsPerConnection, await createClient().ConfigureAwait(false)); - Connections.TryAdd(connectionItem.Client.ClientId, connectionItem); + Connections.TryAdd(newConnectionItem.Client.ClientId, newConnectionItem); return newConnectionItem.Client; } @@ -268,10 +265,7 @@ internal async Task GetOrCreateClient(string brokerInfo, Func x.Key == id && x.Value.Item1 == stream).ToList(); - l.ForEach(x => connectionItem.Client.Consumers.Remove(x.Key)); + l.ForEach(x => connectionItem.Client.Publishers.Remove(x.Key)); } finally { diff --git a/Tests/ConnectionsPoolTests.cs b/Tests/ConnectionsPoolTests.cs index 8ec913d2..6e95380f 100644 --- a/Tests/ConnectionsPoolTests.cs +++ b/Tests/ConnectionsPoolTests.cs @@ -905,6 +905,27 @@ public void RecycleIdsWhenTheMaxIsReachedAndStartWithAnId() Assert.Equal(11, nextValidId); } + [Fact] + public void FindNextValidIdWithAMaxValue() + { + var ids = new List { 0, 1, 2, 3, 255 }; + var v = ConnectionsPool.FindNextValidId(ids, 254); + Assert.Equal(4, v); + } + + [Fact] + public void TrowExceptionIfTheArrayIsFull() + { + var ids = new List(); + for (byte i = 0; i < byte.MaxValue; i++) + { + ids.Add(i); + } + + ids.Add(255); + Assert.Throws(() => ConnectionsPool.FindNextValidId(ids)); + } + [Fact] public void ValidatePoolConsistencyWithClosePolicy() { diff --git a/docs/ReliableClient/BestPracticesClient.cs b/docs/ReliableClient/BestPracticesClient.cs index ca97ec7a..753186f3 100644 --- a/docs/ReliableClient/BestPracticesClient.cs +++ b/docs/ReliableClient/BestPracticesClient.cs @@ -146,8 +146,8 @@ public static async Task Start(Config config) $"Conf: {totalConfirmed:#,##0.00}, " + $"Error: {totalError:#,##0.00}, " + $"Total: {(totalConfirmed + totalError):#,##0.00}, " + - $"Consumed: {totalConsumed:#,##0.00}, " + - $"Sent per stream: {totalSent / streamsList.Count}"); + $"Consumed all: {totalConsumed:#,##0.00}, " + + $"Consumed / Consumers : {totalConsumed / config.Consumers:#,##0.00}"); Thread.Sleep(5000); } });