Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 14 additions & 91 deletions RabbitMQ.Stream.Client/ClientExceptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,7 @@ public static void MaybeThrowException(ResponseCode responseCode, string message
}
}

public class AlreadyClosedException : Exception
{
public AlreadyClosedException(string s)
: base(s)
{
}
}
public class AlreadyClosedException(string s) : Exception(s);

public class ProtocolException : Exception
{
Expand All @@ -94,102 +88,31 @@ protected ProtocolException(string s)
}
}

public class LeaderNotFoundException : ProtocolException
{
public LeaderNotFoundException(string s)
: base(s)
{
}
}
public class LeaderNotFoundException(string s) : ProtocolException(s);

public class GenericProtocolException : ProtocolException
{
public GenericProtocolException(ResponseCode responseCode, string s)
: base($"response code: {responseCode} - {s}")
{
}
}
public class GenericProtocolException(ResponseCode responseCode, string s)
: ProtocolException($"response code: {responseCode} - {s}");

public class AuthenticationFailureException : ProtocolException
{
public AuthenticationFailureException(string s)
: base(s)
{
}
}
public class AuthenticationFailureException(string s) : ProtocolException(s);

public class AuthenticationFailureLoopback : ProtocolException
{
public AuthenticationFailureLoopback(string s)
: base(s)
{
}
}
public class AuthenticationFailureLoopback(string s) : ProtocolException(s);

public class VirtualHostAccessFailureException : ProtocolException
{
public VirtualHostAccessFailureException(string s)
: base(s)
{
}
}
public class VirtualHostAccessFailureException(string s) : ProtocolException(s);

public class OffsetNotFoundException : ProtocolException
{
public OffsetNotFoundException(string s)
: base(s)
{
}
}
public class OffsetNotFoundException(string s) : ProtocolException(s);

// RouteNotFoundException the exception for super stream publish
// RouteNotFoundException is raised when the message can't be routed to any stream.
// In this case the user will receive a timeout error and this exception is raised
public class RouteNotFoundException : ProtocolException
{
public RouteNotFoundException(string s)
: base(s)
{
}
}
public class RouteNotFoundException(string s) : ProtocolException(s);

public class AuthMechanismNotSupportedException : Exception
{
public AuthMechanismNotSupportedException(string s)
: base(s)
{
}
}
public class AuthMechanismNotSupportedException(string s) : Exception(s);

public class UnsupportedOperationException : Exception
{
public UnsupportedOperationException(string s)
: base(s)
{
}
}
public class UnsupportedOperationException(string s) : Exception(s);

public class UnknownCommandException : Exception
{
public UnknownCommandException(string s)
: base(s)
{
}
}
public class UnknownCommandException(string s) : Exception(s);

public class TooManyConnectionsException : Exception
{
public TooManyConnectionsException(string s)
: base(s)
{
}
}
public class TooManyConnectionsException(string s) : Exception(s);

public class PendingConnectionsException : Exception
{
public PendingConnectionsException(string s)
: base(s)
{
}
}
public class PendingConnectionsException(string s) : Exception(s);
}
30 changes: 12 additions & 18 deletions RabbitMQ.Stream.Client/ConnectionsPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ internal static byte FindNextValidId(List<byte> ids, byte nextId = 0)
{
lock (s_lock)
{
// // we start with the recycle when we reach the max value
var originalList = ids.ToList();
// // in this way we can avoid to recycle the same ids in a short time
ids.Sort();
var l = ids.Where(b => b >= nextId).ToList();
Expand All @@ -152,15 +152,17 @@ internal static byte FindNextValidId(List<byte> ids, byte nextId = 0)
if (l[^1] != byte.MaxValue)
return (byte)(l[^1] + 1);

for (var i = 0; i < ids.Count - 1; i++)
// let's try to find a free id in the list
originalList.Reverse();
for (byte i = 0; i < byte.MaxValue; i++)
{
if (l[i + 1] - l[i] > 1)
if (!originalList.Contains(i))
{
return (byte)(l[i] + 1);
return i;
}
}

return (byte)(l[^1] + 1);
throw new InvalidOperationException("No more available ids");
}
}

Expand Down Expand Up @@ -237,28 +239,23 @@ internal async Task<IClient> GetOrCreateClient(string brokerInfo, Func<Task<ICli
await _semaphoreSlim.WaitAsync().ConfigureAwait(false);
try
{
// do we have a connection for this brokerInfo and with free slots for producer or consumer?
// it does not matter which connection is available
// the important is to have a connection available for the brokerInfo
var connectionItems = Connections.Values.Where(x => x.BrokerInfo == brokerInfo && x.Available).ToList();

if (connectionItems.Any())
{
// ok we have a connection available for this brokerInfo
// let's get the first one
var connectionItem = connectionItems.OrderBy(x => x.EntitiesCount).First();
connectionItem.LastUsed = DateTime.UtcNow;

if (connectionItem.Client is not { IsClosed: true })
return connectionItem.Client;

// the connection is closed
// let's remove it from the pool
// remove closed connection
Connections.TryRemove(connectionItem.Client.ClientId, out _);
// let's create a new one

// create and add new connection item with the new client's id
var newConnectionItem = new ConnectionItem(brokerInfo, _idsPerConnection,
await createClient().ConfigureAwait(false));
Connections.TryAdd(connectionItem.Client.ClientId, connectionItem);
Connections.TryAdd(newConnectionItem.Client.ClientId, newConnectionItem);

return newConnectionItem.Client;
}
Expand All @@ -268,10 +265,7 @@ internal async Task<IClient> GetOrCreateClient(string brokerInfo, Func<Task<ICli
throw new TooManyConnectionsException($"Max connections {_maxConnections} reached");
}

// no connection available for this brokerInfo
// let's create a new one
var client = await createClient().ConfigureAwait(false);
// the connection give us the client id that is a GUID
Connections.TryAdd(client.ClientId, new ConnectionItem(brokerInfo, _idsPerConnection, client));
return client;
}
Expand Down Expand Up @@ -408,7 +402,7 @@ public void RemoveProducerEntityFromStream(string clientId, byte id, string stre
var l = connectionItem.Client.Publishers.Where(x =>
x.Key == id && x.Value.Item1 == stream).ToList();

l.ForEach(x => connectionItem.Client.Consumers.Remove(x.Key));
l.ForEach(x => connectionItem.Client.Publishers.Remove(x.Key));
}
finally
{
Expand Down
21 changes: 21 additions & 0 deletions Tests/ConnectionsPoolTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -905,6 +905,27 @@ public void RecycleIdsWhenTheMaxIsReachedAndStartWithAnId()
Assert.Equal(11, nextValidId);
}

[Fact]
public void FindNextValidIdWithAMaxValue()
{
var ids = new List<byte> { 0, 1, 2, 3, 255 };
var v = ConnectionsPool.FindNextValidId(ids, 254);
Assert.Equal(4, v);
}

[Fact]
public void TrowExceptionIfTheArrayIsFull()
{
var ids = new List<byte>();
for (byte i = 0; i < byte.MaxValue; i++)
{
ids.Add(i);
}

ids.Add(255);
Assert.Throws<InvalidOperationException>(() => ConnectionsPool.FindNextValidId(ids));
}

[Fact]
public void ValidatePoolConsistencyWithClosePolicy()
{
Expand Down
4 changes: 2 additions & 2 deletions docs/ReliableClient/BestPracticesClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,8 @@ public static async Task Start(Config config)
$"Conf: {totalConfirmed:#,##0.00}, " +
$"Error: {totalError:#,##0.00}, " +
$"Total: {(totalConfirmed + totalError):#,##0.00}, " +
$"Consumed: {totalConsumed:#,##0.00}, " +
$"Sent per stream: {totalSent / streamsList.Count}");
$"Consumed all: {totalConsumed:#,##0.00}, " +
$"Consumed / Consumers : {totalConsumed / config.Consumers:#,##0.00}");
Thread.Sleep(5000);
}
});
Expand Down