@@ -107,20 +107,16 @@ public class Client : IClient
107107
108108 private uint correlationId = 0 ; // allow for some pre-amble
109109
110- private byte nextPublisherId = 0 ;
111-
112110 private Connection connection ;
113111
114- private readonly IDictionary < byte , ( Action < ReadOnlyMemory < ulong > > , Action < ( ulong , ResponseCode ) [ ] > ) >
115- publishers =
116- new ConcurrentDictionary < byte , ( Action < ReadOnlyMemory < ulong > > , Action < ( ulong , ResponseCode ) [ ] > ) > ( ) ;
117-
118112 private readonly ConcurrentDictionary < uint , IValueTaskSource > requests = new ( ) ;
119113
120114 private readonly TaskCompletionSource < TuneResponse > tuneReceived =
121115 new TaskCompletionSource < TuneResponse > ( TaskCreationOptions . RunContinuationsAsynchronously ) ;
122116
123- private byte nextSubscriptionId ;
117+ internal readonly IDictionary < byte , ( Action < ReadOnlyMemory < ulong > > , Action < ( ulong , ResponseCode ) [ ] > ) >
118+ publishers =
119+ new ConcurrentDictionary < byte , ( Action < ReadOnlyMemory < ulong > > , Action < ( ulong , ResponseCode ) [ ] > ) > ( ) ;
124120
125121 internal readonly IDictionary < byte , ConsumerEvents > consumers =
126122 new ConcurrentDictionary < byte , ConsumerEvents > ( ) ;
@@ -157,17 +153,12 @@ private Client(ClientParameters parameters, ILogger logger = null)
157153 ( int ) parameters . Heartbeat . TotalSeconds ) ;
158154 IsClosed = false ;
159155 _logger = logger ?? NullLogger . Instance ;
160- }
161-
162- private byte GetNextSubscriptionId ( )
163- {
164- byte result ;
165- lock ( Obj )
156+ ClientId = Guid . NewGuid ( ) . ToString ( ) ;
157+ AppDomain . CurrentDomain . UnhandledException += ( sender , args ) =>
166158 {
167- result = nextSubscriptionId ++ ;
168- }
169-
170- return result ;
159+ _logger . LogError ( args . ExceptionObject as Exception , "Unhandled exception" ) ;
160+ Parameters . UnhandledExceptionHandler ( args . ExceptionObject as Exception ) ;
161+ } ;
171162 }
172163
173164 public bool IsClosed
@@ -304,16 +295,36 @@ public ValueTask<bool> Publish<T>(T msg) where T : struct, ICommand
304295 public async Task < ( byte , DeclarePublisherResponse ) > DeclarePublisher ( string publisherRef ,
305296 string stream ,
306297 Action < ReadOnlyMemory < ulong > > confirmCallback ,
307- Action < ( ulong , ResponseCode ) [ ] > errorCallback )
298+ Action < ( ulong , ResponseCode ) [ ] > errorCallback , ConnectionsPool pool = null )
308299 {
309- var publisherId = nextPublisherId ++ ;
310- publishers . Add ( publisherId , ( confirmCallback , errorCallback ) ) ;
311- return ( publisherId , await Request < DeclarePublisherRequest , DeclarePublisherResponse > ( corr =>
312- new DeclarePublisherRequest ( corr , publisherId , publisherRef , stream ) ) . ConfigureAwait ( false ) ) ;
300+ await _poolSemaphore . WaitAsync ( ) . ConfigureAwait ( false ) ;
301+ var publisherId = ConnectionsPool . FindNextValidId ( publishers . Keys . ToList ( ) ) ;
302+ DeclarePublisherResponse response ;
303+
304+ try
305+ {
306+ publishers . Add ( publisherId , ( confirmCallback , errorCallback ) ) ;
307+ response = await Request < DeclarePublisherRequest , DeclarePublisherResponse > ( corr =>
308+ new DeclarePublisherRequest ( corr , publisherId , publisherRef , stream ) ) . ConfigureAwait ( false ) ;
309+ }
310+ finally
311+ {
312+ _poolSemaphore . Release ( ) ;
313+ }
314+
315+ if ( response . ResponseCode == ResponseCode . Ok || pool == null )
316+ return ( publisherId , response ) ;
317+
318+ // if the response code is not ok we need to remove the subscription
319+ // and close the connection if necessary.
320+ publishers . Remove ( publisherId ) ;
321+ await MaybeClose ( "Create Publisher Exception" , stream , pool ) . ConfigureAwait ( false ) ;
322+ return ( publisherId , response ) ;
313323 }
314324
315325 public async Task < DeletePublisherResponse > DeletePublisher ( byte publisherId )
316326 {
327+ await _poolSemaphore . WaitAsync ( ) . ConfigureAwait ( false ) ;
317328 try
318329 {
319330 var result =
@@ -325,6 +336,7 @@ await Request<DeletePublisherRequest, DeletePublisherResponse>(corr =>
325336 finally
326337 {
327338 publishers . Remove ( publisherId ) ;
339+ _poolSemaphore . Release ( ) ;
328340 }
329341 }
330342
@@ -345,21 +357,38 @@ await Request<DeletePublisherRequest, DeletePublisherResponse>(corr =>
345357 Dictionary < string , string > properties , Func < Deliver , Task > deliverHandler ,
346358 Func < bool , Task < IOffsetType > > consumerUpdateHandler )
347359 {
348- var subscriptionId = GetNextSubscriptionId ( ) ;
349-
350- consumers . Add ( subscriptionId ,
351- new ConsumerEvents (
352- deliverHandler ,
353- consumerUpdateHandler ) ) ;
360+ await _poolSemaphore . WaitAsync ( ) . ConfigureAwait ( false ) ;
361+ var subscriptionId = ConnectionsPool . FindNextValidId ( consumers . Keys . ToList ( ) ) ;
362+ SubscribeResponse response ;
363+ try
364+ {
365+ consumers . Add ( subscriptionId ,
366+ new ConsumerEvents (
367+ deliverHandler ,
368+ consumerUpdateHandler ) ) ;
354369
355- return ( subscriptionId ,
356- await Request < SubscribeRequest , SubscribeResponse > ( corr =>
370+ response = await Request < SubscribeRequest , SubscribeResponse > ( corr =>
357371 new SubscribeRequest ( corr , subscriptionId , config . Stream , config . OffsetSpec , initialCredit ,
358- properties ) ) . ConfigureAwait ( false ) ) ;
372+ properties ) ) . ConfigureAwait ( false ) ;
373+ }
374+ finally
375+ {
376+ _poolSemaphore . Release ( ) ;
377+ }
378+
379+ if ( response . ResponseCode == ResponseCode . Ok )
380+ return ( subscriptionId , response ) ;
381+
382+ // if the response code is not ok we need to remove the subscription
383+ // and close the connection if necessary.
384+ consumers . Remove ( subscriptionId ) ;
385+ await MaybeClose ( "Create Consumer Exception" , config . Stream , config . Pool ) . ConfigureAwait ( false ) ;
386+ return ( subscriptionId , response ) ;
359387 }
360388
361389 public async Task < UnsubscribeResponse > Unsubscribe ( byte subscriptionId )
362390 {
391+ await _poolSemaphore . WaitAsync ( ) . ConfigureAwait ( false ) ;
363392 try
364393 {
365394 // here we reduce a bit the timeout to avoid waiting too much
@@ -377,6 +406,7 @@ await Request<UnsubscribeRequest, UnsubscribeResponse>(corr =>
377406 _logger . LogDebug ( "Unsubscribe: {SubscriptionId}" , subscriptionId ) ;
378407 // remove consumer after RPC returns, this should avoid uncorrelated data being sent
379408 consumers . Remove ( subscriptionId ) ;
409+ _poolSemaphore . Release ( ) ;
380410 }
381411 }
382412
@@ -640,6 +670,14 @@ private void InternalClose()
640670 IsClosed = true ;
641671 }
642672
673+ private bool HasEntities ( )
674+ {
675+ lock ( Obj )
676+ {
677+ return publishers . Count > 0 || consumers . Count > 0 ;
678+ }
679+ }
680+
643681 private async ValueTask < bool > ConsumerUpdateResponse ( uint rCorrelationId , IOffsetType offsetSpecification )
644682 {
645683 return await Publish ( new ConsumerUpdateRequest ( rCorrelationId , offsetSpecification ) ) . ConfigureAwait ( false ) ;
@@ -652,6 +690,7 @@ public async Task<CloseResponse> Close(string reason)
652690 return new CloseResponse ( 0 , ResponseCode . Ok ) ;
653691 }
654692
693+ InternalClose ( ) ;
655694 try
656695 {
657696 var result =
@@ -671,28 +710,62 @@ public async Task<CloseResponse> Close(string reason)
671710 }
672711 finally
673712 {
674- // even if the close fails we need to close the connection
675- InternalClose ( ) ;
676713 connection . Dispose ( ) ;
677714 }
678715
679716 return new CloseResponse ( 0 , ResponseCode . Ok ) ;
680717 }
681718
719+ // _poolSemaphore is introduced here: https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/pull/328
720+ // the MaybeClose can be called in different threads so we need to protect the pool
721+ // the pool itself is thread safe but we need to protect the flow to be sure that the
722+ // connection is released only once
723+ private readonly SemaphoreSlim _poolSemaphore = new ( 1 , 1 ) ;
724+
682725 // Safe close
683- // the client can be closed only if the publishers are == 0
684- // not a public method used internally by producers and consumers
685- internal async Task < CloseResponse > MaybeClose ( string reason )
726+ // the client can be closed only if HasEntities is false
727+ // if the client has entities (publishers or consumers) it will be released from the pool
728+ // Release will decrement the active ids for the connection
729+ // if the active ids are 0 the connection will be closed
730+
731+ internal async Task < CloseResponse > MaybeClose ( string reason , string stream , ConnectionsPool pool )
686732 {
687- if ( publishers . Count == 0 && consumers . Count == 0 )
733+ await _poolSemaphore . WaitAsync ( ) . ConfigureAwait ( false ) ;
734+ try
688735 {
689- await Close ( reason ) . ConfigureAwait ( false ) ;
690- }
736+ if ( ! HasEntities ( ) )
737+ {
738+ if ( ! string . IsNullOrEmpty ( ClientId ) )
739+ {
740+ _logger . LogInformation ( "Close connection for the {ClientId}" , ClientId ) ;
741+ // the client can be closed in an unexpected way so we need to remove it from the pool
742+ // so you will find pool.remove(ClientId) also to the disconnect event
743+ // pool.remove(ClientId) is a duplicate call here but it is ok. The pool is idempotent
744+ pool . Remove ( ClientId ) ;
745+ await Close ( reason ) . ConfigureAwait ( false ) ;
746+ }
747+ }
748+ else
749+ {
750+ // we remove an id reference from the client
751+ // in case there are still active ids from the client and the stream
752+ if ( ! string . IsNullOrEmpty ( ClientId ) )
753+ {
754+ pool . Release ( ClientId , stream ) ;
755+ }
756+ }
691757
692- var result = new CloseResponse ( 0 , ResponseCode . Ok ) ;
693- return result ;
758+ var result = new CloseResponse ( 0 , ResponseCode . Ok ) ;
759+ return result ;
760+ }
761+ finally
762+ {
763+ _poolSemaphore . Release ( ) ;
764+ }
694765 }
695766
767+ public string ClientId { get ; init ; }
768+
696769 public async ValueTask < QueryPublisherResponse > QueryPublisherSequence ( string publisherRef , string stream )
697770 {
698771 return await Request < QueryPublisherRequest , QueryPublisherResponse > ( corr =>
0 commit comments