From 192e9ec6b2256fd4271b031d2b6a3b3a1d9d9207 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Yaqiu=20LIU=20=E5=88=98=E4=BA=9A=E7=A7=8B?= Date: Thu, 16 Dec 2021 17:47:27 +0800 Subject: [PATCH 1/2] Fix RecursionError because of repeated channel reconnections. --- amqp/channel.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/amqp/channel.py b/amqp/channel.py index 77cfaabf..759c34bd 100644 --- a/amqp/channel.py +++ b/amqp/channel.py @@ -24,6 +24,8 @@ consumer_tag=%r exchange=%r routing_key=%r.\ """ +MAX_RECONNECTIONS = 256 + class VDeprecationWarning(DeprecationWarning): pass @@ -116,6 +118,8 @@ def __init__(self, connection, self.on_open = ensure_promise(on_open) + self.reconnection_count = 0 + # set first time basic_publish_confirm is called # and publisher confirms are enabled for this channel. self._confirm_selected = False @@ -162,6 +166,7 @@ def collect(self): def _do_revive(self): self.is_open = False + self.reconnection_count += 1 self.open() def close(self, reply_code=0, reply_text='', method_sig=(0, 0), @@ -277,6 +282,10 @@ def _on_close(self, reply_code, reply_text, class_id, method_id): """ self.send_method(spec.Channel.CloseOk) if not self.connection.is_closing: + if self.reconnection_count >= MAX_RECONNECTIONS: + raise error_for_code( + reply_code, reply_text, (class_id, method_id), ChannelError, + ) self._do_revive() raise error_for_code( reply_code, reply_text, (class_id, method_id), ChannelError, @@ -445,6 +454,7 @@ def _on_open_ok(self): """ self.is_open = True self.on_open(self) + self.reconnection_count = 0 AMQP_LOGGER.debug('Channel open') ############# From 6c2ab7f18f2580e288ca03085b02de8e0c4b0ab8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Yaqiu=20LIU=20=E5=88=98=E4=BA=9A=E7=A7=8B?= Date: Fri, 17 Dec 2021 13:06:46 +0800 Subject: [PATCH 2/2] Add unit test. --- amqp/channel.py | 2 +- t/unit/test_channel.py | 12 ++++++++++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/amqp/channel.py b/amqp/channel.py index 759c34bd..c5bf01e5 100644 --- a/amqp/channel.py +++ b/amqp/channel.py @@ -24,7 +24,7 @@ consumer_tag=%r exchange=%r routing_key=%r.\ """ -MAX_RECONNECTIONS = 256 +MAX_RECONNECTIONS = 128 class VDeprecationWarning(DeprecationWarning): diff --git a/t/unit/test_channel.py b/t/unit/test_channel.py index 79eb951c..af9879ae 100644 --- a/t/unit/test_channel.py +++ b/t/unit/test_channel.py @@ -90,6 +90,18 @@ def test_close(self): assert self.c.is_closing is False assert self.c.connection is None + def test_on_close_exceeds_max_reconnections(self): + assert self.c.reconnection_count == 0 + + def mocked_on_revive(): + self.c.reconnection_count += 1 + self.c._on_close(404, 'text', 50, 61) + self.c._do_revive = Mock(name='_do_revive', side_effect=mocked_on_revive) + with pytest.raises(NotFound): + self.c._on_close(404, 'text', 50, 61) + from amqp.channel import MAX_RECONNECTIONS + assert self.c.reconnection_count == MAX_RECONNECTIONS + def test_on_close(self): self.c._do_revive = Mock(name='_do_revive') with pytest.raises(NotFound):