Skip to content

Commit 3565e17

Browse files
Joakim Anderssonlukebakken
authored andcommitted
Properly cancel token passed to AsyncEventingBasicConsumer receiver event on channel close
Fixes #1787
1 parent 44d8be3 commit 3565e17

File tree

5 files changed

+19
-10
lines changed

5 files changed

+19
-10
lines changed

projects/RabbitMQ.Client/ConsumerDispatching/AsyncConsumerDispatcher.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,10 @@ protected override async Task ProcessChannelAsync()
3333
work.DeliveryTag, work.BasicProperties!, work.Body.Size))
3434
{
3535
await work.Consumer.HandleBasicDeliverAsync(
36-
work.ConsumerTag!, work.DeliveryTag, work.Redelivered,
37-
work.Exchange!, work.RoutingKey!, work.BasicProperties!, work.Body.Memory)
36+
work.ConsumerTag!, work.DeliveryTag, work.Redelivered,
37+
work.Exchange!, work.RoutingKey!, work.BasicProperties!,
38+
work.Body.Memory,
39+
work.Consumer.Channel?.ChannelCancellationToken ?? default)
3840
.ConfigureAwait(false);
3941
}
4042
break;

projects/RabbitMQ.Client/IChannel.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -450,5 +450,10 @@ Task QueueUnbindAsync(string queue, string exchange, string routingKey,
450450
/// timing out.
451451
/// </summary>
452452
TimeSpan ContinuationTimeout { get; set; }
453+
454+
/// <summary>
455+
/// The <see cref="CancellationToken"/> associated with channel closure.
456+
/// </summary>
457+
CancellationToken ChannelCancellationToken { get; }
453458
}
454459
}

projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ public IEnumerable<string> ConsumerTags
136136
public int ChannelNumber => InnerChannel.ChannelNumber;
137137

138138
public ShutdownEventArgs? CloseReason => InnerChannel.CloseReason;
139+
public CancellationToken ChannelCancellationToken => InnerChannel.ChannelCancellationToken;
139140

140141
public IAsyncBasicConsumer? DefaultConsumer
141142
{

projects/RabbitMQ.Client/Impl/Channel.cs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@ internal partial class Channel : IChannel, IRecoverable
6666
private bool _disposed;
6767
private int _isDisposing;
6868

69+
private CancellationTokenSource _closeAsyncCts = new CancellationTokenSource();
70+
6971
public Channel(ISession session, CreateChannelOptions createChannelOptions)
7072
{
7173
ContinuationTimeout = createChannelOptions.ContinuationTimeout;
@@ -85,6 +87,8 @@ public Channel(ISession session, CreateChannelOptions createChannelOptions)
8587
Session = session;
8688
}
8789

90+
public CancellationToken ChannelCancellationToken => _closeAsyncCts.Token;
91+
8892
internal TimeSpan HandshakeContinuationTimeout { get; set; } = TimeSpan.FromSeconds(10);
8993

9094
public TimeSpan ContinuationTimeout { get; set; }
@@ -208,13 +212,6 @@ public Task CloseAsync(ushort replyCode, string replyText, bool abort,
208212
public async Task CloseAsync(ShutdownEventArgs args, bool abort,
209213
CancellationToken cancellationToken)
210214
{
211-
CancellationToken argCancellationToken = cancellationToken;
212-
if (IsOpen)
213-
{
214-
// Note: we really do need to try and close this channel!
215-
cancellationToken = CancellationToken.None;
216-
}
217-
218215
bool enqueued = false;
219216
var k = new ChannelCloseAsyncRpcContinuation(ContinuationTimeout, cancellationToken);
220217

@@ -236,6 +233,8 @@ await ModelSendAsync(in method, k.CancellationToken)
236233

237234
AssertResultIsTrue(await k);
238235

236+
_closeAsyncCts.Cancel();
237+
239238
await ConsumerDispatcher.WaitForShutdownAsync()
240239
.ConfigureAwait(false);
241240
}
@@ -265,7 +264,7 @@ await ConsumerDispatcher.WaitForShutdownAsync()
265264
MaybeDisposeContinuation(enqueued, k);
266265
_rpcSemaphore.Release();
267266
ChannelShutdownAsync -= k.OnConnectionShutdownAsync;
268-
argCancellationToken.ThrowIfCancellationRequested();
267+
cancellationToken.ThrowIfCancellationRequested();
269268
}
270269
}
271270

@@ -591,6 +590,7 @@ protected virtual void Dispose(bool disposing)
591590
{
592591
_rpcSemaphore.Dispose();
593592
_confirmSemaphore.Dispose();
593+
_closeAsyncCts.Dispose();
594594
}
595595
catch
596596
{

projects/RabbitMQ.Client/PublicAPI.Unshipped.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ RabbitMQ.Client.Exceptions.PublishReturnException.PublishReturnException(ulong p
55
RabbitMQ.Client.Exceptions.PublishReturnException.ReplyCode.get -> ushort
66
RabbitMQ.Client.Exceptions.PublishReturnException.ReplyText.get -> string!
77
RabbitMQ.Client.Exceptions.PublishReturnException.RoutingKey.get -> string!
8+
RabbitMQ.Client.IChannel.ChannelCancellationToken.get -> System.Threading.CancellationToken
89
RabbitMQ.Client.RabbitMQTracingOptions
910
RabbitMQ.Client.RabbitMQTracingOptions.RabbitMQTracingOptions() -> void
1011
RabbitMQ.Client.RabbitMQTracingOptions.UsePublisherAsParent.get -> bool

0 commit comments

Comments
 (0)