diff --git a/.style.yapf b/.style.yapf index 2c5517e..38fa03d 100644 --- a/.style.yapf +++ b/.style.yapf @@ -45,7 +45,7 @@ blank_line_before_nested_class_or_def=False coalesce_brackets=False # The column limit. -column_limit=79 +column_limit=99 # Indent width used for line continuations. continuation_indent_width=4 diff --git a/tests/test_basic.py b/tests/test_basic.py index 4be3e3e..871d198 100644 --- a/tests/test_basic.py +++ b/tests/test_basic.py @@ -33,9 +33,7 @@ async def test_basic_qos_prefetch_size(self): async with amqp.new_channel() as channel: with pytest.raises(exceptions.ChannelClosed) as cm: await channel.basic_qos( - prefetch_size=10, - prefetch_count=100, - connection_global=False + prefetch_size=10, prefetch_count=100, connection_global=False ) assert cm.value.code == 540 @@ -48,9 +46,7 @@ async def test_basic_qos_wrong_values(self): async with amqp.new_channel() as channel: with pytest.raises(struct.error): await channel.basic_qos( - prefetch_size=100000, - prefetch_count=1000000000, - connection_global=False + prefetch_size=100000, prefetch_count=1000000000, connection_global=False ) @@ -68,9 +64,7 @@ async def callback(channel, body, envelope, _properties): result = await channel.basic_consume(callback, queue_name=queue_name) result = await channel.basic_cancel(result['consumer_tag']) - result = await channel.publish( - "payload", exchange_name, routing_key='' - ) + result = await channel.publish("payload", exchange_name, routing_key='') await trio.sleep(1) result = await channel.queue_declare(queue_name, passive=True) @@ -92,20 +86,15 @@ async def test_basic_get(self, channel): await channel.queue_declare(queue_name) await channel.exchange_declare(exchange_name, type_name='direct') - await channel.queue_bind( - queue_name, exchange_name, routing_key=routing_key - ) + await channel.queue_bind(queue_name, exchange_name, routing_key=routing_key) - await channel.publish( - "payload", exchange_name, routing_key=routing_key - ) + await channel.publish("payload", exchange_name, routing_key=routing_key) result = await channel.basic_get(queue_name) assert result['routing_key'] == routing_key assert not result['redelivered'] assert 'delivery_tag' in result - assert result['exchange_name' - ] == channel.protocol.full_name(exchange_name) + assert result['exchange_name'] == channel.protocol.full_name(exchange_name) assert result['message'] == b'payload' assert isinstance(result['properties'], properties.Properties) @@ -116,26 +105,18 @@ async def test_basic_get_empty(self, channel): routing_key = '' await channel.queue_declare(queue_name) await channel.exchange_declare(exchange_name, type_name='direct') - await channel.queue_bind( - queue_name, exchange_name, routing_key=routing_key - ) + await channel.queue_bind(queue_name, exchange_name, routing_key=routing_key) with pytest.raises(exceptions.EmptyQueue): await channel.basic_get(queue_name) class TestBasicDelivery(testcase.RabbitTestCase): - async def publish( - self, amqp, queue_name, exchange_name, routing_key, payload - ): + async def publish(self, amqp, queue_name, exchange_name, routing_key, payload): async with amqp.new_channel() as channel: - await channel.queue_declare( - queue_name, exclusive=False, no_wait=False - ) + await channel.queue_declare(queue_name, exclusive=False, no_wait=False) await channel.exchange_declare(exchange_name, type_name='fanout') - await channel.queue_bind( - queue_name, exchange_name, routing_key=routing_key - ) + await channel.queue_bind(queue_name, exchange_name, routing_key=routing_key) await channel.publish(payload, exchange_name, queue_name) @pytest.mark.trio @@ -144,9 +125,7 @@ async def test_ack_message(self, amqp): exchange_name = 'exchange_name' routing_key = '' - await self.publish( - amqp, queue_name, exchange_name, routing_key, "payload" - ) + await self.publish(amqp, queue_name, exchange_name, routing_key, "payload") qfuture = trio.Event() @@ -167,9 +146,7 @@ async def test_basic_nack(self, amqp): exchange_name = 'exchange_name' routing_key = '' - await self.publish( - amqp, queue_name, exchange_name, routing_key, "payload" - ) + await self.publish(amqp, queue_name, exchange_name, routing_key, "payload") qfuture = trio.Event() @@ -190,18 +167,14 @@ async def test_basic_nack_norequeue(self, amqp): exchange_name = 'exchange_name' routing_key = '' - await self.publish( - amqp, queue_name, exchange_name, routing_key, "payload" - ) + await self.publish(amqp, queue_name, exchange_name, routing_key, "payload") qfuture = trio.Event() async with amqp.new_channel() as channel: async def qcallback(channel, body, envelope, _properties): - await channel.basic_client_nack( - envelope.delivery_tag, requeue=False - ) + await channel.basic_client_nack(envelope.delivery_tag, requeue=False) qfuture.set() await channel.basic_consume(qcallback, queue_name=queue_name) @@ -213,9 +186,7 @@ async def test_basic_nack_requeue(self, amqp): exchange_name = 'exchange_name' routing_key = '' - await self.publish( - amqp, queue_name, exchange_name, routing_key, "payload" - ) + await self.publish(amqp, queue_name, exchange_name, routing_key, "payload") qfuture = trio.Event() called = False @@ -226,9 +197,7 @@ async def qcallback(channel, body, envelope, _properties): nonlocal called if not called: called = True - await channel.basic_client_nack( - envelope.delivery_tag, requeue=True - ) + await channel.basic_client_nack(envelope.delivery_tag, requeue=True) else: await channel.basic_client_ack(envelope.delivery_tag) qfuture.set() @@ -241,9 +210,7 @@ async def test_basic_reject(self, amqp): queue_name = 'queue_name' exchange_name = 'exchange_name' routing_key = '' - await self.publish( - amqp, queue_name, exchange_name, routing_key, "payload" - ) + await self.publish(amqp, queue_name, exchange_name, routing_key, "payload") qfuture = trio.Event() diff --git a/tests/test_channel.py b/tests/test_channel.py index 58e72a9..bd07021 100644 --- a/tests/test_channel.py +++ b/tests/test_channel.py @@ -55,8 +55,7 @@ async def test_channel_active_flow(self, channel): assert result['active'] @pytest.mark.skipif( - IMPLEMENT_CHANNEL_FLOW is False, - reason="active=false is not implemented in RabbitMQ" + IMPLEMENT_CHANNEL_FLOW is False, reason="active=false is not implemented in RabbitMQ" ) @pytest.mark.trio async def test_channel_inactive_flow(self, channel): @@ -71,8 +70,7 @@ async def test_channel_active_flow_twice(self, channel): result = await channel.flow(active=True) @pytest.mark.skipif( - IMPLEMENT_CHANNEL_FLOW is False, - reason="active=false is not implemented in RabbitMQ" + IMPLEMENT_CHANNEL_FLOW is False, reason="active=false is not implemented in RabbitMQ" ) @pytest.mark.trio async def test_channel_active_inactive_flow(self, channel): diff --git a/tests/test_consume.py b/tests/test_consume.py index 98b8c88..bc3c374 100644 --- a/tests/test_consume.py +++ b/tests/test_consume.py @@ -35,9 +35,7 @@ def badcallback(): with pytest.raises(TypeError): async with proto as amqp: async with amqp.new_channel() as chan: - await chan.queue_declare( - "q", exclusive=True, no_wait=False - ) + await chan.queue_declare("q", exclusive=True, no_wait=False) await chan.exchange_declare("e", "fanout") await chan.queue_bind("q", "e", routing_key='') @@ -54,9 +52,7 @@ def badcallback(): await self.check_messages(amqp, "q", 1) # start consume - await channel.basic_consume( - badcallback, queue_name="q" - ) + await channel.basic_consume(badcallback, queue_name="q") await trio.sleep(1) @pytest.mark.trio @@ -138,13 +134,9 @@ async def q2_callback(channel, body, envelope, properties): q2_future.set() # start consumers - result = await channel.basic_consume( - q1_callback, queue_name="q1" - ) + result = await channel.basic_consume(q1_callback, queue_name="q1") ctag_q1 = result['consumer_tag'] - result = await channel.basic_consume( - q2_callback, queue_name="q2" - ) + result = await channel.basic_consume(q2_callback, queue_name="q2") ctag_q2 = result['consumer_tag'] # put message in q1 @@ -172,14 +164,10 @@ async def q2_callback(channel, body, envelope, properties): async def test_duplicate_consumer_tag(self, channel): await channel.queue_declare("q1", exclusive=True, no_wait=False) await channel.queue_declare("q2", exclusive=True, no_wait=False) - await channel.basic_consume( - self.callback, queue_name="q1", consumer_tag='tag' - ) + await channel.basic_consume(self.callback, queue_name="q1", consumer_tag='tag') with pytest.raises(exceptions.ChannelClosed) as cm: - await channel.basic_consume( - self.callback, queue_name="q2", consumer_tag='tag' - ) + await channel.basic_consume(self.callback, queue_name="q2", consumer_tag='tag') assert cm.value.code == 530 diff --git a/tests/test_exchange.py b/tests/test_exchange.py index d463a95..53b9fff 100644 --- a/tests/test_exchange.py +++ b/tests/test_exchange.py @@ -14,74 +14,49 @@ class TestExchangeDeclare(testcase.RabbitTestCase): @pytest.mark.trio async def test_exchange_direct_declare(self, channel): - result = await channel.exchange_declare( - 'exchange_name', type_name='direct' - ) + result = await channel.exchange_declare('exchange_name', type_name='direct') assert result @pytest.mark.trio async def test_exchange_fanout_declare(self, channel): - result = await channel.exchange_declare( - 'exchange_name', type_name='fanout' - ) + result = await channel.exchange_declare('exchange_name', type_name='fanout') assert result @pytest.mark.trio async def test_exchange_topic_declare(self, channel): - result = await channel.exchange_declare( - 'exchange_name', type_name='topic' - ) + result = await channel.exchange_declare('exchange_name', type_name='topic') assert result @pytest.mark.trio async def test_exchange_headers_declare(self, channel): - result = await channel.exchange_declare( - 'exchange_name', type_name='headers' - ) + result = await channel.exchange_declare('exchange_name', type_name='headers') assert result @pytest.mark.trio async def test_exchange_declare_wrong_types(self, channel): result = await channel.exchange_declare( - 'exchange_name', - type_name='headers', - auto_delete=True, - durable=True + 'exchange_name', type_name='headers', auto_delete=True, durable=True ) assert result with pytest.raises(exceptions.ChannelClosed): result = await channel.exchange_declare( - 'exchange_name', - type_name='fanout', - auto_delete=False, - durable=False + 'exchange_name', type_name='fanout', auto_delete=False, durable=False ) @pytest.mark.trio async def test_exchange_declare_passive(self, channel): result = await channel.exchange_declare( - 'exchange_name', - type_name='headers', - auto_delete=True, - durable=True + 'exchange_name', type_name='headers', auto_delete=True, durable=True ) assert result result = await channel.exchange_declare( - 'exchange_name', - type_name='headers', - auto_delete=True, - durable=True, - passive=True + 'exchange_name', type_name='headers', auto_delete=True, durable=True, passive=True ) assert result result = await channel.exchange_declare( - 'exchange_name', - type_name='headers', - auto_delete=False, - durable=False, - passive=True + 'exchange_name', type_name='headers', auto_delete=False, durable=False, passive=True ) assert result @@ -117,9 +92,7 @@ async def test_delete(self, channel): result = await channel.exchange_delete(exchange_name) assert result with pytest.raises(exceptions.ChannelClosed) as cm: - await channel.exchange_declare( - exchange_name, type_name='direct', passive=True - ) + await channel.exchange_declare(exchange_name, type_name='direct', passive=True) assert cm.value.code == 404 @@ -144,9 +117,7 @@ async def test_double_delete(self, channel): class TestExchangeBind(testcase.RabbitTestCase): @pytest.mark.trio async def test_exchange_bind(self, channel): - await channel.exchange_declare( - 'exchange_destination', type_name='direct' - ) + await channel.exchange_declare('exchange_destination', type_name='direct') await channel.exchange_declare('exchange_source', type_name='direct') result = await channel.exchange_bind( @@ -158,9 +129,7 @@ async def test_exchange_bind(self, channel): @pytest.mark.trio async def test_inexistant_exchange_bind(self, channel): with pytest.raises(exceptions.ChannelClosed) as cm: - await channel.exchange_bind( - 'exchange_destination', 'exchange_source', routing_key='' - ) + await channel.exchange_bind('exchange_destination', 'exchange_source', routing_key='') assert cm.value.code == 404 @@ -175,9 +144,7 @@ async def test_exchange_unbind(self, channel): await channel.exchange_bind(ex_destination, ex_source, routing_key='') - await channel.exchange_unbind( - ex_destination, ex_source, routing_key='' - ) + await channel.exchange_unbind(ex_destination, ex_source, routing_key='') @pytest.mark.trio async def test_exchange_unbind_reversed(self, channel): @@ -190,15 +157,11 @@ async def test_exchange_unbind_reversed(self, channel): if self.server_version(channel.protocol) < (3, 3, 5): with pytest.raises(exceptions.ChannelClosed) as cm: - result = await channel.exchange_unbind( - ex_source, ex_destination, routing_key='' - ) + result = await channel.exchange_unbind(ex_source, ex_destination, routing_key='') assert cm.value.code == 404 else: # weird result from rabbitmq 3.3.5 - result = await channel.exchange_unbind( - ex_source, ex_destination, routing_key='' - ) + result = await channel.exchange_unbind(ex_source, ex_destination, routing_key='') assert result diff --git a/tests/test_frame.py b/tests/test_frame.py index d37eefd..47a2aac 100644 --- a/tests/test_frame.py +++ b/tests/test_frame.py @@ -42,23 +42,18 @@ def test_write_array(self): def test_write_float(self): self.encoder.write_value(1.1) - assert self.encoder.payload.getvalue( - ) == b'd?\xf1\x99\x99\x99\x99\x99\x9a' + assert self.encoder.payload.getvalue() == b'd?\xf1\x99\x99\x99\x99\x99\x9a' def test_write_decimal(self): self.encoder.write_value(Decimal("-1.1")) assert self.encoder.payload.getvalue() == b'D\x01\xff\xff\xff\xf5' self.encoder.write_value(Decimal("1.1")) - assert self.encoder.payload.getvalue( - ) == b'D\x01\xff\xff\xff\xf5D\x01\x00\x00\x00\x0b' + assert self.encoder.payload.getvalue() == b'D\x01\xff\xff\xff\xf5D\x01\x00\x00\x00\x0b' def test_write_datetime(self): - self.encoder.write_value( - datetime.datetime(2017, 12, 10, 4, 6, 49, 548918) - ) - assert self.encoder.payload.getvalue( - ) == b'T\x00\x00\x00\x00Z,\xb2\xd9' + self.encoder.write_value(datetime.datetime(2017, 12, 10, 4, 6, 49, 548918)) + assert self.encoder.payload.getvalue() == b'T\x00\x00\x00\x00Z,\xb2\xd9' def test_write_dict(self): self.encoder.write_value({'foo': 'bar', 'bar': 'baz'}) diff --git a/tests/test_properties.py b/tests/test_properties.py index 63e6114..02f0e92 100644 --- a/tests/test_properties.py +++ b/tests/test_properties.py @@ -15,8 +15,10 @@ exchange_name = 'exchange_name' server_routing_key = 'reply_test' + class TestReplyOld(testcase.RabbitTestCase): """RPC test using classic callbacks""" + async def _server( self, amqp, @@ -29,31 +31,22 @@ async def _server( to the client using routing key set to the reply_to property """ async with amqp.new_channel() as channel: - await channel.queue_declare( - server_queue_name, exclusive=False, no_wait=False - ) + await channel.queue_declare(server_queue_name, exclusive=False, no_wait=False) await channel.exchange_declare(exchange_name, type_name='direct') - await channel.queue_bind( - server_queue_name, exchange_name, routing_key=routing_key - ) + await channel.queue_bind(server_queue_name, exchange_name, routing_key=routing_key) async def server_callback(channel, body, envelope, properties): logger.debug('Server received message') - publish_properties = { - 'correlation_id': properties.correlation_id - } + publish_properties = {'correlation_id': properties.correlation_id} logger.debug('Replying to %r', properties.reply_to) await channel.publish( - b'reply message', exchange_name, properties.reply_to, - publish_properties + b'reply message', exchange_name, properties.reply_to, publish_properties ) server_future.test_result = (body, envelope, properties) server_future.set() logger.debug('Server replied') - await channel.basic_consume( - server_callback, queue_name=server_queue_name - ) + await channel.basic_consume(server_callback, queue_name=server_queue_name) logger.debug('Server consuming messages') task_status.started() await server_future.wait() @@ -73,13 +66,9 @@ async def _client( routing key """ async with amqp.new_channel() as client_channel: - await client_channel.queue_declare( - client_queue_name, exclusive=True, no_wait=False - ) + await client_channel.queue_declare(client_queue_name, exclusive=True, no_wait=False) await client_channel.queue_bind( - client_queue_name, - exchange_name, - routing_key=client_routing_key + client_queue_name, exchange_name, routing_key=client_routing_key ) async def client_callback(channel, body, envelope, properties): @@ -87,9 +76,7 @@ async def client_callback(channel, body, envelope, properties): client_future.test_result = (body, envelope, properties) client_future.set() - await client_channel.basic_consume( - client_callback, queue_name=client_queue_name - ) + await client_channel.basic_consume(client_callback, queue_name=client_queue_name) logger.debug('Client consuming messages') task_status.started() @@ -106,18 +93,15 @@ async def client_callback(channel, body, envelope, properties): async def test_reply_to(self, amqp): server_future = trio.Event() async with trio.open_nursery() as n: - await n.start( - self._server, amqp, server_future, exchange_name, - server_routing_key - ) + await n.start(self._server, amqp, server_future, exchange_name, server_routing_key) correlation_id = 'secret correlation id' client_routing_key = 'secret_client_key' client_future = trio.Event() await n.start( - self._client, amqp, client_future, exchange_name, - server_routing_key, correlation_id, client_routing_key + self._client, amqp, client_future, exchange_name, server_routing_key, + correlation_id, client_routing_key ) logger.debug('Waiting for server to receive message') @@ -141,6 +125,7 @@ async def test_reply_to(self, amqp): class TestReplyNew(testcase.RabbitTestCase): """RPC test using iteration""" + async def _server( self, amqp, @@ -153,13 +138,9 @@ async def _server( to the client using routing key set to the reply_to property """ async with amqp.new_channel() as channel: - await channel.queue_declare( - server_queue_name, exclusive=False, no_wait=False - ) + await channel.queue_declare(server_queue_name, exclusive=False, no_wait=False) await channel.exchange_declare(exchange_name, type_name='direct') - await channel.queue_bind( - server_queue_name, exchange_name, routing_key=routing_key - ) + await channel.queue_bind(server_queue_name, exchange_name, routing_key=routing_key) async with trio.open_nursery() as n: await n.start(self._server_consumer, channel, server_future) @@ -177,13 +158,10 @@ async def _server_consumer(self, channel, server_future, task_status=trio.TASK_S async for body, envelope, properties in data: logger.debug('Server received message') - publish_properties = { - 'correlation_id': properties.correlation_id - } + publish_properties = {'correlation_id': properties.correlation_id} logger.debug('Replying to %r', properties.reply_to) await channel.publish( - b'reply message', exchange_name, properties.reply_to, - publish_properties + b'reply message', exchange_name, properties.reply_to, publish_properties ) server_future.test_result = (body, envelope, properties) server_future.set() @@ -204,13 +182,9 @@ async def _client( routing key """ async with amqp.new_channel() as client_channel: - await client_channel.queue_declare( - client_queue_name, exclusive=True, no_wait=False - ) + await client_channel.queue_declare(client_queue_name, exclusive=True, no_wait=False) await client_channel.queue_bind( - client_queue_name, - exchange_name, - routing_key=client_routing_key + client_queue_name, exchange_name, routing_key=client_routing_key ) async with trio.open_nursery() as n: @@ -244,18 +218,15 @@ async def _client_consumer(self, channel, client_future, task_status=trio.TASK_S async def test_reply_to(self, amqp): server_future = trio.Event() async with trio.open_nursery() as n: - await n.start( - self._server, amqp, server_future, exchange_name, - server_routing_key - ) + await n.start(self._server, amqp, server_future, exchange_name, server_routing_key) correlation_id = 'secret correlation id' client_routing_key = 'secret_client_key' client_future = trio.Event() await n.start( - self._client, amqp, client_future, exchange_name, - server_routing_key, correlation_id, client_routing_key + self._client, amqp, client_future, exchange_name, server_routing_key, + correlation_id, client_routing_key ) logger.debug('Waiting for server to receive message') @@ -275,4 +246,3 @@ async def test_reply_to(self, amqp): assert client_properties.correlation_id == correlation_id assert client_envelope.routing_key == client_routing_key n.cancel_scope.cancel() - diff --git a/tests/test_queue.py b/tests/test_queue.py index e039d1f..7239f25 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -35,8 +35,7 @@ async def test_queue_declare(self, channel): result = await channel.queue_declare(queue_name) assert result['message_count'] == 0, result assert result['consumer_count'] == 0, result - assert channel.protocol.local_name(result['queue'] - ) == queue_name, result + assert channel.protocol.local_name(result['queue']) == queue_name, result assert result @pytest.mark.trio @@ -46,8 +45,7 @@ async def test_queue_declare_passive(self, channel): result = await channel.queue_declare(queue_name, passive=True) assert result['message_count'] == 0, result assert result['consumer_count'] == 0, result - assert channel.protocol.local_name(result['queue'] - ) == queue_name, result + assert channel.protocol.local_name(result['queue']) == queue_name, result @pytest.mark.trio async def test_queue_declare_passive_nonexistant_queue(self, channel): @@ -60,9 +58,7 @@ async def test_queue_declare_passive_nonexistant_queue(self, channel): @pytest.mark.trio async def test_wrong_parameter_queue(self, channel): queue_name = 'q18' - await channel.queue_declare( - queue_name, exclusive=False, auto_delete=False - ) + await channel.queue_declare(queue_name, exclusive=False, auto_delete=False) with pytest.raises(exceptions.ChannelClosed) as cm: await channel.queue_declare( @@ -78,27 +74,18 @@ async def test_multiple_channel_same_queue(self, amqp): async with amqp.new_channel() as channel1: async with amqp.new_channel() as channel2: - result = await channel1.queue_declare( - queue_name, passive=False - ) + result = await channel1.queue_declare(queue_name, passive=False) assert result['message_count'] == 0, result assert result['consumer_count'] == 0, result assert amqp.local_name(result['queue']) == queue_name, result - result = await channel2.queue_declare( - queue_name, passive=False - ) + result = await channel2.queue_declare(queue_name, passive=False) assert result['message_count'] == 0, result assert result['consumer_count'] == 0, result assert amqp.local_name(result['queue']) == queue_name, result async def _test_queue_declare( - self, - amqp, - queue_name, - exclusive=False, - durable=False, - auto_delete=False + self, amqp, queue_name, exclusive=False, durable=False, auto_delete=False ): # declare queue async with amqp.new_channel() as channel: @@ -128,9 +115,7 @@ async def _test_queue_declare( @pytest.mark.trio async def test_durable_and_auto_deleted(self, amqp): - await self._test_queue_declare( - amqp, 'q1', exclusive=False, durable=True, auto_delete=True - ) + await self._test_queue_declare(amqp, 'q1', exclusive=False, durable=True, auto_delete=True) @pytest.mark.trio async def test_durable_and_not_auto_deleted(self, amqp): @@ -156,18 +141,14 @@ async def test_exclusive(self, amqp): # create an exclusive queue await channel.queue_declare("q5", exclusive=True) # consume from it - await channel.basic_consume( - self.callback, queue_name="q5", no_wait=False - ) + await channel.basic_consume(self.callback, queue_name="q5", no_wait=False) # create another amqp connection async with self.create_amqp() as amqp: async with amqp.new_channel() as channel: # assert that this connection cannot connect to the queue with pytest.raises(exceptions.ChannelClosed): - await channel.basic_consume( - self.callback, queue_name="q5", no_wait=False - ) + await channel.basic_consume(self.callback, queue_name="q5", no_wait=False) # channels are auto deleted by test case @pytest.mark.trio @@ -176,16 +157,12 @@ async def test_not_exclusive(self, amqp): # create a non-exclusive queue await channel.queue_declare('q6', exclusive=False) # consume it - await channel.basic_consume( - self.callback, queue_name='q6', no_wait=False - ) + await channel.basic_consume(self.callback, queue_name='q6', no_wait=False) # create an other amqp connection async with self.create_amqp(test_seq=amqp.test_seq) as amqp2: async with amqp2.new_channel() as channel: # assert that this connection can connect to the queue - await channel.basic_consume( - self.callback, queue_name='q6', no_wait=False - ) + await channel.basic_consume(self.callback, queue_name='q6', no_wait=False) class TestQueueDelete(testcase.RabbitTestCase): @@ -220,9 +197,7 @@ async def test_bind_queue(self, channel): await channel.queue_declare(queue_name) await channel.exchange_declare(exchange_name, type_name='direct') - result = await channel.queue_bind( - queue_name, exchange_name, routing_key='' - ) + result = await channel.queue_bind(queue_name, exchange_name, routing_key='') assert result @pytest.mark.trio @@ -256,9 +231,7 @@ async def test_unbind_queue(self, channel): await channel.queue_bind(queue_name, exchange_name, routing_key='') - result = await channel.queue_unbind( - queue_name, exchange_name, routing_key='' - ) + result = await channel.queue_unbind(queue_name, exchange_name, routing_key='') assert result diff --git a/tests/testcase.py b/tests/testcase.py index 645b78c..3f9ea25 100644 --- a/tests/testcase.py +++ b/tests/testcase.py @@ -28,8 +28,7 @@ def use_full_name(f, arg_names): for arg_name in arg_names: if arg_name not in sig.parameters: raise ValueError( - '%s is not a valid argument name for function %s' % - (arg_name, f.__qualname__) + '%s is not a valid argument name for function %s' % (arg_name, f.__qualname__) ) @wraps(f) @@ -37,27 +36,19 @@ def wrapper(self, *args, **kw): ba = sig.bind_partial(self, *args, **kw) for param in sig.parameters.values(): if param.name in arg_names and param.name in ba.arguments: - ba.arguments[param.name] = self.full_name( - ba.arguments[param.name] - ) + ba.arguments[param.name] = self.full_name(ba.arguments[param.name]) return f(*(ba.args), **(ba.kwargs)) return wrapper class ProxyChannel(Channel): - exchange_declare = use_full_name( - Channel.exchange_declare, ['exchange_name'] - ) + exchange_declare = use_full_name(Channel.exchange_declare, ['exchange_name']) exchange_delete = use_full_name(Channel.exchange_delete, ['exchange_name']) queue_declare = use_full_name(Channel.queue_declare, ['queue_name']) queue_delete = use_full_name(Channel.queue_delete, ['queue_name']) - queue_bind = use_full_name( - Channel.queue_bind, ['queue_name', 'exchange_name'] - ) - queue_unbind = use_full_name( - Channel.queue_unbind, ['queue_name', 'exchange_name'] - ) + queue_bind = use_full_name(Channel.queue_bind, ['queue_name', 'exchange_name']) + queue_unbind = use_full_name(Channel.queue_unbind, ['queue_name', 'exchange_name']) queue_purge = use_full_name(Channel.queue_purge, ['queue_name']) exchange_bind = use_full_name( @@ -173,10 +164,7 @@ def setup(self): self.port = int(os.environ.get('AMQP_PORT', 5672)) self.vhost = os.environ.get('AMQP_VHOST', 'test' + str(uuid.uuid4())) self.http_client = pyrabbit.api.Client( - '%s:%s/' % (self.host, 10000 + self.port), - 'guest', - 'guest', - timeout=20 + '%s:%s/' % (self.host, 10000 + self.port), 'guest', 'guest', timeout=20 ) self.amqps = [] @@ -211,9 +199,7 @@ def reset_vhost(self): reset_vhost() # global def server_version(self, amqp): - server_version = tuple( - int(x) for x in amqp.server_properties['version'].split('.') - ) + server_version = tuple(int(x) for x in amqp.server_properties['version'].split('.')) return server_version async def check_exchange_exists(self, exchange_name): @@ -242,9 +228,7 @@ async def assertQueueExists(self, queue_name): if not self.check_queue_exists(queue_name): self.fail("Queue {} does not exists".format(queue_name)) - def list_queues( - self, amqp, vhost=None, fully_qualified_name=False, delay=None - ): + def list_queues(self, amqp, vhost=None, fully_qualified_name=False, delay=None): # wait for the http client to get the correct state of the queue if delay is None: delay = int(os.environ.get('AMQP_REFRESH_TIME', 1.1)) @@ -290,15 +274,9 @@ async def safe_queue_delete(self, queue_name, channel): try: await channel.queue_delete(full_queue_name, no_wait=False) except trio.TooSlowError: - logger.warning( - 'Timeout on queue %s deletion', full_queue_name, exc_info=True - ) + logger.warning('Timeout on queue %s deletion', full_queue_name, exc_info=True) except Exception: # pylint: disable=broad-except - logger.error( - 'Unexpected error on queue %s deletion', - full_queue_name, - exc_info=True - ) + logger.error('Unexpected error on queue %s deletion', full_queue_name, exc_info=True) async def safe_exchange_delete(self, exchange_name, channel=None): """Delete the exchange but does not raise any exception if it fails @@ -310,21 +288,13 @@ async def safe_exchange_delete(self, exchange_name, channel=None): try: await channel.exchange_delete(full_exchange_name, no_wait=False) except trio.TooSlowError: - logger.warning( - 'Timeout on exchange %s deletion', - full_exchange_name, - exc_info=True - ) + logger.warning('Timeout on exchange %s deletion', full_exchange_name, exc_info=True) except Exception: # pylint: disable=broad-except logger.error( - 'Unexpected error on exchange %s deletion', - full_exchange_name, - exc_info=True + 'Unexpected error on exchange %s deletion', full_exchange_name, exc_info=True ) - async def queue_declare( - self, queue_name, *args, channel=None, safe_delete_before=True, **kw - ): + async def queue_declare(self, queue_name, *args, channel=None, safe_delete_before=True, **kw): channel = channel or self.channel if safe_delete_before: await self.safe_queue_delete(queue_name, channel=channel) @@ -338,12 +308,7 @@ async def queue_declare( return rep async def exchange_declare( - self, - exchange_name, - *args, - channel=None, - safe_delete_before=True, - **kw + self, exchange_name, *args, channel=None, safe_delete_before=True, **kw ): channel = channel or self.channel if safe_delete_before: @@ -351,9 +316,7 @@ async def exchange_declare( # prefix exchange name full_exchange_name = self.full_name(exchange_name) try: - rep = await channel.exchange_declare( - full_exchange_name, *args, **kw - ) + rep = await channel.exchange_declare(full_exchange_name, *args, **kw) finally: self.exchanges[exchange_name] = (exchange_name, channel) return rep @@ -361,10 +324,7 @@ async def exchange_declare( @asynccontextmanager async def create_amqp(self, vhost=None, test_seq=None): async with testcase.connect( - host=self.host, - port=self.port, - virtualhost=vhost or self.vhost, - test_seq=test_seq + host=self.host, port=self.port, virtualhost=vhost or self.vhost, test_seq=test_seq ) as protocol: self.amqps.append(protocol) yield protocol diff --git a/tox.ini b/tox.ini index 1ea00ba..ef3712d 100644 --- a/tox.ini +++ b/tox.ini @@ -9,3 +9,6 @@ deps = -rci/requirements_dev.txt commands = py.test-3 + +[flake8] +max-line-length=99 diff --git a/trio_amqp/__init__.py b/trio_amqp/__init__.py index 7f90c5d..718946f 100644 --- a/trio_amqp/__init__.py +++ b/trio_amqp/__init__.py @@ -27,10 +27,7 @@ async def connect_from_url(url, **kwargs): url = urlparse(url) if url.scheme not in ('amqp', 'amqps'): - raise ValueError( - 'Invalid protocol %s, valid protocols are amqp or amqps' % - url.scheme - ) + raise ValueError('Invalid protocol %s, valid protocols are amqp or amqps' % url.scheme) if url.hostname: kwargs['host'] = url.hostname diff --git a/trio_amqp/channel.py b/trio_amqp/channel.py index 6090bdb..67d0294 100644 --- a/trio_amqp/channel.py +++ b/trio_amqp/channel.py @@ -38,9 +38,7 @@ async def __anext__(self): return await self._q.get() async def __aenter__(self): - await self.channel.basic_consume( - self._data, consumer_tag=self.consumer_tag, **self.kwargs - ) + await self.channel.basic_consume(self._data, consumer_tag=self.consumer_tag, **self.kwargs) self._q = trio.Queue(30) # TODO: 2 + possible prefetch return self @@ -94,9 +92,7 @@ def _set_waiter(self, rpc_name): def _get_waiter(self, rpc_name): fut = self._futures.pop(rpc_name, None) if not fut: - raise exceptions.SynchronizationError( - "Call %s didn't set a waiter" % rpc_name - ) + raise exceptions.SynchronizationError("Call %s didn't set a waiter" % rpc_name) return fut @property @@ -105,9 +101,7 @@ def is_open(self): return False return not self.close_event.is_set() - def connection_closed( - self, server_code=None, server_reason=None, exception=None - ): + def connection_closed(self, server_code=None, server_reason=None, exception=None): for future in self._futures.values(): if future.done(): continue @@ -133,8 +127,7 @@ async def dispatch_frame(self, frame): self.close_ok, (amqp_constants.CLASS_CHANNEL, amqp_constants.CHANNEL_CLOSE): self.server_channel_close, - (amqp_constants.CLASS_EXCHANGE, - amqp_constants.EXCHANGE_DECLARE_OK): + (amqp_constants.CLASS_EXCHANGE, amqp_constants.EXCHANGE_DECLARE_OK): self.exchange_declare_ok, (amqp_constants.CLASS_EXCHANGE, amqp_constants.EXCHANGE_BIND_OK): self.exchange_bind_ok, @@ -178,8 +171,7 @@ async def dispatch_frame(self, frame): if (frame.class_id, frame.method_id) not in methods: raise NotImplementedError( - "Frame (%s, %s) is not implemented" % - (frame.class_id, frame.method_id) + "Frame (%s, %s) is not implemented" % (frame.class_id, frame.method_id) ) await methods[(frame.class_id, frame.method_id)](frame) @@ -202,16 +194,12 @@ async def _write_frame_awaiting_response( (unless no_wait is set) ''' if no_wait: - await self._write_frame( - frame, request, check_open=check_open, drain=drain - ) + await self._write_frame(frame, request, check_open=check_open, drain=drain) return None f = self._set_waiter(waiter_id) try: - await self._write_frame( - frame, request, check_open=check_open, drain=drain - ) + await self._write_frame(frame, request, check_open=check_open, drain=drain) except BaseException as exc: self._get_waiter(waiter_id) f.cancel() @@ -225,12 +213,8 @@ async def _write_frame_awaiting_response( async def open(self): """Open the channel on the server.""" - frame = amqp_frame.AmqpRequest( - amqp_constants.TYPE_METHOD, self.channel_id - ) - frame.declare_method( - amqp_constants.CLASS_CHANNEL, amqp_constants.CHANNEL_OPEN - ) + frame = amqp_frame.AmqpRequest(amqp_constants.TYPE_METHOD, self.channel_id) + frame.declare_method(amqp_constants.CLASS_CHANNEL, amqp_constants.CHANNEL_OPEN) request = amqp_frame.AmqpEncoder() request.write_shortstr('') return ( @@ -250,12 +234,8 @@ async def close(self, reply_code=0, reply_text="Normal Shutdown"): if not self.is_open: raise exceptions.ChannelClosed("channel already closed or closing") self.close_event.set() - frame = amqp_frame.AmqpRequest( - amqp_constants.TYPE_METHOD, self.channel_id - ) - frame.declare_method( - amqp_constants.CLASS_CHANNEL, amqp_constants.CHANNEL_CLOSE - ) + frame = amqp_frame.AmqpRequest(amqp_constants.TYPE_METHOD, self.channel_id) + frame.declare_method(amqp_constants.CLASS_CHANNEL, amqp_constants.CHANNEL_CLOSE) request = amqp_frame.AmqpEncoder() request.write_short(reply_code) request.write_shortstr(reply_text) @@ -274,12 +254,8 @@ async def close_ok(self, frame): self.protocol.release_channel_id(self.channel_id) async def _send_channel_close_ok(self): - frame = amqp_frame.AmqpRequest( - amqp_constants.TYPE_METHOD, self.channel_id - ) - frame.declare_method( - amqp_constants.CLASS_CHANNEL, amqp_constants.CHANNEL_CLOSE_OK - ) + frame = amqp_frame.AmqpRequest(amqp_constants.TYPE_METHOD, self.channel_id) + frame.declare_method(amqp_constants.CLASS_CHANNEL, amqp_constants.CHANNEL_CLOSE_OK) request = amqp_frame.AmqpEncoder() # intentionally not locked await self._write_frame(frame, request) @@ -295,12 +271,8 @@ async def server_channel_close(self, frame): self.connection_closed(results['reply_code'], results['reply_text']) async def flow(self, active): - frame = amqp_frame.AmqpRequest( - amqp_constants.TYPE_METHOD, self.channel_id - ) - frame.declare_method( - amqp_constants.CLASS_CHANNEL, amqp_constants.CHANNEL_FLOW - ) + frame = amqp_frame.AmqpRequest(amqp_constants.TYPE_METHOD, self.channel_id) + frame.declare_method(amqp_constants.CLASS_CHANNEL, amqp_constants.CHANNEL_FLOW) request = amqp_frame.AmqpEncoder() request.write_bits(active) async with self._write_lock: @@ -333,12 +305,8 @@ async def exchange_declare( no_wait=False, arguments=None ): - frame = amqp_frame.AmqpRequest( - amqp_constants.TYPE_METHOD, self.channel_id - ) - frame.declare_method( - amqp_constants.CLASS_EXCHANGE, amqp_constants.EXCHANGE_DECLARE - ) + frame = amqp_frame.AmqpRequest(amqp_constants.TYPE_METHOD, self.channel_id) + frame.declare_method(amqp_constants.CLASS_EXCHANGE, amqp_constants.EXCHANGE_DECLARE) request = amqp_frame.AmqpEncoder() # short reserved-1 request.write_short(0) @@ -351,9 +319,8 @@ async def exchange_declare( async with self._write_lock: return ( - await self._write_frame_awaiting_response( - 'exchange_declare', frame, request, no_wait - ) + await + self._write_frame_awaiting_response('exchange_declare', frame, request, no_wait) ) async def exchange_declare_ok(self, frame): @@ -362,15 +329,9 @@ async def exchange_declare_ok(self, frame): logger.debug("Exchange declared") return future - async def exchange_delete( - self, exchange_name, if_unused=False, no_wait=False - ): - frame = amqp_frame.AmqpRequest( - amqp_constants.TYPE_METHOD, self.channel_id - ) - frame.declare_method( - amqp_constants.CLASS_EXCHANGE, amqp_constants.EXCHANGE_DELETE - ) + async def exchange_delete(self, exchange_name, if_unused=False, no_wait=False): + frame = amqp_frame.AmqpRequest(amqp_constants.TYPE_METHOD, self.channel_id) + frame.declare_method(amqp_constants.CLASS_EXCHANGE, amqp_constants.EXCHANGE_DELETE) request = amqp_frame.AmqpEncoder() # short reserved-1 request.write_short(0) @@ -379,9 +340,8 @@ async def exchange_delete( async with self._write_lock: return ( - await self._write_frame_awaiting_response( - 'exchange_delete', frame, request, no_wait - ) + await + self._write_frame_awaiting_response('exchange_delete', frame, request, no_wait) ) async def exchange_delete_ok(self, frame): @@ -390,21 +350,12 @@ async def exchange_delete_ok(self, frame): logger.debug("Exchange deleted") async def exchange_bind( - self, - exchange_destination, - exchange_source, - routing_key, - no_wait=False, - arguments=None + self, exchange_destination, exchange_source, routing_key, no_wait=False, arguments=None ): if arguments is None: arguments = {} - frame = amqp_frame.AmqpRequest( - amqp_constants.TYPE_METHOD, self.channel_id - ) - frame.declare_method( - amqp_constants.CLASS_EXCHANGE, amqp_constants.EXCHANGE_BIND - ) + frame = amqp_frame.AmqpRequest(amqp_constants.TYPE_METHOD, self.channel_id) + frame.declare_method(amqp_constants.CLASS_EXCHANGE, amqp_constants.EXCHANGE_BIND) request = amqp_frame.AmqpEncoder() request.write_short(0) # reserved @@ -416,9 +367,8 @@ async def exchange_bind( request.write_table(arguments) async with self._write_lock: return ( - await self._write_frame_awaiting_response( - 'exchange_bind', frame, request, no_wait - ) + await + self._write_frame_awaiting_response('exchange_bind', frame, request, no_wait) ) async def exchange_bind_ok(self, frame): @@ -427,21 +377,12 @@ async def exchange_bind_ok(self, frame): logger.debug("Exchange bound") async def exchange_unbind( - self, - exchange_destination, - exchange_source, - routing_key, - no_wait=False, - arguments=None + self, exchange_destination, exchange_source, routing_key, no_wait=False, arguments=None ): if arguments is None: arguments = {} - frame = amqp_frame.AmqpRequest( - amqp_constants.TYPE_METHOD, self.channel_id - ) - frame.declare_method( - amqp_constants.EXCHANGE_UNBIND, amqp_constants.EXCHANGE_UNBIND - ) + frame = amqp_frame.AmqpRequest(amqp_constants.TYPE_METHOD, self.channel_id) + frame.declare_method(amqp_constants.EXCHANGE_UNBIND, amqp_constants.EXCHANGE_UNBIND) request = amqp_frame.AmqpEncoder() request.write_short(0) # reserved @@ -453,9 +394,8 @@ async def exchange_unbind( request.write_table(arguments) async with self._write_lock: return ( - await self._write_frame_awaiting_response( - 'exchange_unbind', frame, request, no_wait - ) + await + self._write_frame_awaiting_response('exchange_unbind', frame, request, no_wait) ) async def exchange_unbind_ok(self, frame): @@ -504,12 +444,8 @@ async def queue_declare( if not queue_name: queue_name = '' - frame = amqp_frame.AmqpRequest( - amqp_constants.TYPE_METHOD, self.channel_id - ) - frame.declare_method( - amqp_constants.CLASS_QUEUE, amqp_constants.QUEUE_DECLARE - ) + frame = amqp_frame.AmqpRequest(amqp_constants.TYPE_METHOD, self.channel_id) + frame.declare_method(amqp_constants.CLASS_QUEUE, amqp_constants.QUEUE_DECLARE) request = amqp_frame.AmqpEncoder() request.write_short(0) # reserved request.write_shortstr(queue_name) @@ -517,9 +453,8 @@ async def queue_declare( request.write_table(arguments) async with self._write_lock: return ( - await self._write_frame_awaiting_response( - 'queue_declare', frame, request, no_wait - ) + await + self._write_frame_awaiting_response('queue_declare', frame, request, no_wait) ) async def queue_declare_ok(self, frame): @@ -532,9 +467,7 @@ async def queue_declare_ok(self, frame): future.set_result(results) logger.debug("Queue declared") - async def queue_delete( - self, queue_name, if_unused=False, if_empty=False, no_wait=False - ): + async def queue_delete(self, queue_name, if_unused=False, if_empty=False, no_wait=False): """Delete a queue in RabbitMQ Args: queue_name: @@ -548,12 +481,8 @@ async def queue_delete( no_wait: bool, if set, the server will not respond to the method """ - frame = amqp_frame.AmqpRequest( - amqp_constants.TYPE_METHOD, self.channel_id - ) - frame.declare_method( - amqp_constants.CLASS_QUEUE, amqp_constants.QUEUE_DELETE - ) + frame = amqp_frame.AmqpRequest(amqp_constants.TYPE_METHOD, self.channel_id) + frame.declare_method(amqp_constants.CLASS_QUEUE, amqp_constants.QUEUE_DELETE) request = amqp_frame.AmqpEncoder() request.write_short(0) # reserved @@ -561,9 +490,7 @@ async def queue_delete( request.write_bits(if_unused, if_empty, no_wait) async with self._write_lock: return ( - await self._write_frame_awaiting_response( - 'queue_delete', frame, request, no_wait - ) + await self._write_frame_awaiting_response('queue_delete', frame, request, no_wait) ) async def queue_delete_ok(self, frame): @@ -572,22 +499,13 @@ async def queue_delete_ok(self, frame): logger.debug("Queue deleted") async def queue_bind( - self, - queue_name, - exchange_name, - routing_key, - no_wait=False, - arguments=None + self, queue_name, exchange_name, routing_key, no_wait=False, arguments=None ): """Bind a queue to an exchange.""" if arguments is None: arguments = {} - frame = amqp_frame.AmqpRequest( - amqp_constants.TYPE_METHOD, self.channel_id - ) - frame.declare_method( - amqp_constants.CLASS_QUEUE, amqp_constants.QUEUE_BIND - ) + frame = amqp_frame.AmqpRequest(amqp_constants.TYPE_METHOD, self.channel_id) + frame.declare_method(amqp_constants.CLASS_QUEUE, amqp_constants.QUEUE_BIND) request = amqp_frame.AmqpEncoder() # short reserved-1 @@ -599,9 +517,7 @@ async def queue_bind( request.write_table(arguments) async with self._write_lock: return ( - await self._write_frame_awaiting_response( - 'queue_bind', frame, request, no_wait - ) + await self._write_frame_awaiting_response('queue_bind', frame, request, no_wait) ) async def queue_bind_ok(self, frame): @@ -609,17 +525,11 @@ async def queue_bind_ok(self, frame): future.set_result(True) logger.debug("Queue bound") - async def queue_unbind( - self, queue_name, exchange_name, routing_key, arguments=None - ): + async def queue_unbind(self, queue_name, exchange_name, routing_key, arguments=None): if arguments is None: arguments = {} - frame = amqp_frame.AmqpRequest( - amqp_constants.TYPE_METHOD, self.channel_id - ) - frame.declare_method( - amqp_constants.CLASS_QUEUE, amqp_constants.QUEUE_UNBIND - ) + frame = amqp_frame.AmqpRequest(amqp_constants.TYPE_METHOD, self.channel_id) + frame.declare_method(amqp_constants.CLASS_QUEUE, amqp_constants.QUEUE_UNBIND) request = amqp_frame.AmqpEncoder() # short reserved-1 @@ -630,9 +540,8 @@ async def queue_unbind( request.write_table(arguments) async with self._write_lock: return ( - await self._write_frame_awaiting_response( - 'queue_unbind', frame, request, no_wait=False - ) + await + self._write_frame_awaiting_response('queue_unbind', frame, request, no_wait=False) ) async def queue_unbind_ok(self, frame): @@ -641,12 +550,8 @@ async def queue_unbind_ok(self, frame): logger.debug("Queue unbound") async def queue_purge(self, queue_name, no_wait=False): - frame = amqp_frame.AmqpRequest( - amqp_constants.TYPE_METHOD, self.channel_id - ) - frame.declare_method( - amqp_constants.CLASS_QUEUE, amqp_constants.QUEUE_PURGE - ) + frame = amqp_frame.AmqpRequest(amqp_constants.TYPE_METHOD, self.channel_id) + frame.declare_method(amqp_constants.CLASS_QUEUE, amqp_constants.QUEUE_PURGE) request = amqp_frame.AmqpEncoder() # short reserved-1 @@ -682,12 +587,8 @@ async def basic_publish( assert payload, "Payload cannot be empty" async with self._write_lock: - method_frame = amqp_frame.AmqpRequest( - amqp_constants.TYPE_METHOD, self.channel_id - ) - method_frame.declare_method( - amqp_constants.CLASS_BASIC, amqp_constants.BASIC_PUBLISH - ) + method_frame = amqp_frame.AmqpRequest(amqp_constants.TYPE_METHOD, self.channel_id) + method_frame.declare_method(amqp_constants.CLASS_BASIC, amqp_constants.BASIC_PUBLISH) method_request = amqp_frame.AmqpEncoder() method_request.write_short(0) method_request.write_shortstr(exchange_name) @@ -695,9 +596,7 @@ async def basic_publish( method_request.write_bits(mandatory, immediate) await self._write_frame(method_frame, method_request, drain=False) - header_frame = amqp_frame.AmqpRequest( - amqp_constants.TYPE_HEADER, self.channel_id - ) + header_frame = amqp_frame.AmqpRequest(amqp_constants.TYPE_HEADER, self.channel_id) header_frame.declare_class(amqp_constants.CLASS_BASIC) header_frame.set_body_size(len(payload)) encoder = amqp_frame.AmqpEncoder() @@ -707,14 +606,9 @@ async def basic_publish( # split the payload frame_max = self.protocol.server_frame_max or len(payload) - for chunk in ( - payload[0 + i:frame_max + i] - for i in range(0, len(payload), frame_max) - ): + for chunk in (payload[0 + i:frame_max + i] for i in range(0, len(payload), frame_max)): - content_frame = amqp_frame.AmqpRequest( - amqp_constants.TYPE_BODY, self.channel_id - ) + content_frame = amqp_frame.AmqpRequest(amqp_constants.TYPE_BODY, self.channel_id) content_frame.declare_class(amqp_constants.CLASS_BASIC) encoder = amqp_frame.AmqpEncoder() if isinstance(chunk, str): @@ -725,9 +619,7 @@ async def basic_publish( await self.protocol._drain() - async def basic_qos( - self, prefetch_size=0, prefetch_count=0, connection_global=None - ): + async def basic_qos(self, prefetch_size=0, prefetch_count=0, connection_global=None): """Specifies quality of service. Args: @@ -746,12 +638,8 @@ async def basic_qos( per-consumer channel; and global=true to mean that the QoS settings should apply per-channel. """ - frame = amqp_frame.AmqpRequest( - amqp_constants.TYPE_METHOD, self.channel_id - ) - frame.declare_method( - amqp_constants.CLASS_BASIC, amqp_constants.BASIC_QOS - ) + frame = amqp_frame.AmqpRequest(amqp_constants.TYPE_METHOD, self.channel_id) + frame.declare_method(amqp_constants.CLASS_BASIC, amqp_constants.BASIC_QOS) request = amqp_frame.AmqpEncoder() request.write_long(prefetch_size) request.write_short(prefetch_count) @@ -759,9 +647,8 @@ async def basic_qos( async with self._write_lock: return ( - await self._write_frame_awaiting_response( - 'basic_qos', frame, request, no_wait=False - ) + await + self._write_frame_awaiting_response('basic_qos', frame, request, no_wait=False) ) async def basic_qos_ok(self, frame): @@ -828,9 +715,7 @@ def new_consumer( :meth:`basic_client_nack` on the envelope's :attribute:`delivery_tag`. """ # If a consumer tag was not passed, create one - consumer_tag = consumer_tag or 'ctag%i.%s' % ( - self.channel_id, uuid.uuid4().hex - ) + consumer_tag = consumer_tag or 'ctag%i.%s' % (self.channel_id, uuid.uuid4().hex) if arguments is None: arguments = {} @@ -899,19 +784,13 @@ async def basic_consume( :attribute:`delivery_tag`. """ # If a consumer tag was not passed, create one - consumer_tag = consumer_tag or 'ctag%i.%s' % ( - self.channel_id, uuid.uuid4().hex - ) + consumer_tag = consumer_tag or 'ctag%i.%s' % (self.channel_id, uuid.uuid4().hex) if arguments is None: arguments = {} - frame = amqp_frame.AmqpRequest( - amqp_constants.TYPE_METHOD, self.channel_id - ) - frame.declare_method( - amqp_constants.CLASS_BASIC, amqp_constants.BASIC_CONSUME - ) + frame = amqp_frame.AmqpRequest(amqp_constants.TYPE_METHOD, self.channel_id) + frame.declare_method(amqp_constants.CLASS_BASIC, amqp_constants.BASIC_CONSUME) request = amqp_frame.AmqpEncoder() request.write_short(0) request.write_shortstr(queue_name) @@ -956,10 +835,7 @@ async def basic_deliver(self, frame): buffer.write(content_body_frame.payload) body = buffer.getvalue() - envelope = Envelope( - consumer_tag, delivery_tag, exchange_name, routing_key, - is_redeliver - ) + envelope = Envelope(consumer_tag, delivery_tag, exchange_name, routing_key, is_redeliver) properties = content_header_frame.properties callback = self.consumer_callbacks[consumer_tag] @@ -974,9 +850,7 @@ async def basic_deliver(self, frame): else: cbi = inspect.getfullargspec(callback) if 'task_status' in cbi.args or 'task_status' in cbi.kwonlyargs: - await self.protocol._nursery.start( - callback, self, body, envelope, properties - ) + await self.protocol._nursery.start(callback, self, body, envelope, properties) else: await callback(self, body, envelope, properties) @@ -988,12 +862,8 @@ async def server_basic_cancel(self, frame): logger.info("consume cancelled received") async def basic_cancel(self, consumer_tag, no_wait=False): - frame = amqp_frame.AmqpRequest( - amqp_constants.TYPE_METHOD, self.channel_id - ) - frame.declare_method( - amqp_constants.CLASS_BASIC, amqp_constants.BASIC_CANCEL - ) + frame = amqp_frame.AmqpRequest(amqp_constants.TYPE_METHOD, self.channel_id) + frame.declare_method(amqp_constants.CLASS_BASIC, amqp_constants.BASIC_CANCEL) request = amqp_frame.AmqpEncoder() request.write_shortstr(consumer_tag) request.write_bits(no_wait) @@ -1013,21 +883,16 @@ async def basic_cancel_ok(self, frame): logger.debug("Cancel ok") async def basic_get(self, queue_name='', no_ack=False): - frame = amqp_frame.AmqpRequest( - amqp_constants.TYPE_METHOD, self.channel_id - ) - frame.declare_method( - amqp_constants.CLASS_BASIC, amqp_constants.BASIC_GET - ) + frame = amqp_frame.AmqpRequest(amqp_constants.TYPE_METHOD, self.channel_id) + frame.declare_method(amqp_constants.CLASS_BASIC, amqp_constants.BASIC_GET) request = amqp_frame.AmqpEncoder() request.write_short(0) request.write_shortstr(queue_name) request.write_bits(no_ack) async with self._write_lock: return ( - await self._write_frame_awaiting_response( - 'basic_get', frame, request, no_wait=False - ) + await + self._write_frame_awaiting_response('basic_get', frame, request, no_wait=False) ) async def basic_get_ok(self, frame): @@ -1055,27 +920,17 @@ async def basic_get_empty(self, frame): future.set_exception(exceptions.EmptyQueue) async def basic_client_ack(self, delivery_tag, multiple=False): - frame = amqp_frame.AmqpRequest( - amqp_constants.TYPE_METHOD, self.channel_id - ) - frame.declare_method( - amqp_constants.CLASS_BASIC, amqp_constants.BASIC_ACK - ) + frame = amqp_frame.AmqpRequest(amqp_constants.TYPE_METHOD, self.channel_id) + frame.declare_method(amqp_constants.CLASS_BASIC, amqp_constants.BASIC_ACK) request = amqp_frame.AmqpEncoder() request.write_long_long(delivery_tag) request.write_bits(multiple) async with self._write_lock: await self._write_frame(frame, request) - async def basic_client_nack( - self, delivery_tag, multiple=False, requeue=True - ): - frame = amqp_frame.AmqpRequest( - amqp_constants.TYPE_METHOD, self.channel_id - ) - frame.declare_method( - amqp_constants.CLASS_BASIC, amqp_constants.BASIC_NACK - ) + async def basic_client_nack(self, delivery_tag, multiple=False, requeue=True): + frame = amqp_frame.AmqpRequest(amqp_constants.TYPE_METHOD, self.channel_id) + frame.declare_method(amqp_constants.CLASS_BASIC, amqp_constants.BASIC_NACK) request = amqp_frame.AmqpEncoder() request.write_long_long(delivery_tag) request.write_bits(multiple, requeue) @@ -1090,12 +945,8 @@ async def basic_server_ack(self, frame): fut.set_result(True) async def basic_reject(self, delivery_tag, requeue=False): - frame = amqp_frame.AmqpRequest( - amqp_constants.TYPE_METHOD, self.channel_id - ) - frame.declare_method( - amqp_constants.CLASS_BASIC, amqp_constants.BASIC_REJECT - ) + frame = amqp_frame.AmqpRequest(amqp_constants.TYPE_METHOD, self.channel_id) + frame.declare_method(amqp_constants.CLASS_BASIC, amqp_constants.BASIC_REJECT) request = amqp_frame.AmqpEncoder() request.write_long_long(delivery_tag) request.write_bits(requeue) @@ -1103,24 +954,16 @@ async def basic_reject(self, delivery_tag, requeue=False): await self._write_frame(frame, request) async def basic_recover_async(self, requeue=True): - frame = amqp_frame.AmqpRequest( - amqp_constants.TYPE_METHOD, self.channel_id - ) - frame.declare_method( - amqp_constants.CLASS_BASIC, amqp_constants.BASIC_RECOVER_ASYNC - ) + frame = amqp_frame.AmqpRequest(amqp_constants.TYPE_METHOD, self.channel_id) + frame.declare_method(amqp_constants.CLASS_BASIC, amqp_constants.BASIC_RECOVER_ASYNC) request = amqp_frame.AmqpEncoder() request.write_bits(requeue) async with self._write_lock: await self._write_frame(frame, request) async def basic_recover(self, requeue=True): - frame = amqp_frame.AmqpRequest( - amqp_constants.TYPE_METHOD, self.channel_id - ) - frame.declare_method( - amqp_constants.CLASS_BASIC, amqp_constants.BASIC_RECOVER - ) + frame = amqp_frame.AmqpRequest(amqp_constants.TYPE_METHOD, self.channel_id) + frame.declare_method(amqp_constants.CLASS_BASIC, amqp_constants.BASIC_RECOVER) request = amqp_frame.AmqpEncoder() request.write_bits(requeue) async with self._write_lock: @@ -1157,16 +1000,10 @@ async def publish( async with self._write_lock: if self.publisher_confirms: delivery_tag = next(self.delivery_tag_iter) - fut = self._set_waiter( - 'basic_server_ack_{}'.format(delivery_tag) - ) + fut = self._set_waiter('basic_server_ack_{}'.format(delivery_tag)) - method_frame = amqp_frame.AmqpRequest( - amqp_constants.TYPE_METHOD, self.channel_id - ) - method_frame.declare_method( - amqp_constants.CLASS_BASIC, amqp_constants.BASIC_PUBLISH - ) + method_frame = amqp_frame.AmqpRequest(amqp_constants.TYPE_METHOD, self.channel_id) + method_frame.declare_method(amqp_constants.CLASS_BASIC, amqp_constants.BASIC_PUBLISH) method_request = amqp_frame.AmqpEncoder() method_request.write_short(0) method_request.write_shortstr(exchange_name) @@ -1174,9 +1011,7 @@ async def publish( method_request.write_bits(mandatory, immediate) await self._write_frame(method_frame, method_request, drain=False) - header_frame = amqp_frame.AmqpRequest( - amqp_constants.TYPE_HEADER, self.channel_id - ) + header_frame = amqp_frame.AmqpRequest(amqp_constants.TYPE_HEADER, self.channel_id) header_frame.declare_class(amqp_constants.CLASS_BASIC) header_frame.set_body_size(len(payload)) encoder = amqp_frame.AmqpEncoder() @@ -1186,14 +1021,9 @@ async def publish( # split the payload frame_max = self.protocol.server_frame_max or len(payload) - for chunk in ( - payload[0 + i:frame_max + i] - for i in range(0, len(payload), frame_max) - ): + for chunk in (payload[0 + i:frame_max + i] for i in range(0, len(payload), frame_max)): - content_frame = amqp_frame.AmqpRequest( - amqp_constants.TYPE_BODY, self.channel_id - ) + content_frame = amqp_frame.AmqpRequest(amqp_constants.TYPE_BODY, self.channel_id) content_frame.declare_class(amqp_constants.CLASS_BASIC) encoder = amqp_frame.AmqpEncoder() if isinstance(chunk, str): @@ -1210,20 +1040,15 @@ async def publish( async def confirm_select(self, *, no_wait=False): if self.publisher_confirms: raise ValueError('publisher confirms already enabled') - frame = amqp_frame.AmqpRequest( - amqp_constants.TYPE_METHOD, self.channel_id - ) - frame.declare_method( - amqp_constants.CLASS_CONFIRM, amqp_constants.CONFIRM_SELECT - ) + frame = amqp_frame.AmqpRequest(amqp_constants.TYPE_METHOD, self.channel_id) + frame.declare_method(amqp_constants.CLASS_CONFIRM, amqp_constants.CONFIRM_SELECT) request = amqp_frame.AmqpEncoder() request.write_shortstr('') async with self._write_lock: return ( - await self._write_frame_awaiting_response( - 'confirm_select', frame, request, no_wait - ) + await + self._write_frame_awaiting_response('confirm_select', frame, request, no_wait) ) async def confirm_select_ok(self, frame): diff --git a/trio_amqp/constants.py b/trio_amqp/constants.py index 5a1aa6e..f7945b7 100644 --- a/trio_amqp/constants.py +++ b/trio_amqp/constants.py @@ -93,9 +93,8 @@ CONFIRM_SELECT_OK = 11 MESSAGE_PROPERTIES = ( - 'content_type', 'content_encoding', 'headers', 'delivery_mode', 'priority', - 'correlation_id', 'reply_to', 'expiration', 'message_id', 'timestamp', - 'type', 'user_id', 'app_id', 'cluster_id' + 'content_type', 'content_encoding', 'headers', 'delivery_mode', 'priority', 'correlation_id', + 'reply_to', 'expiration', 'message_id', 'timestamp', 'type', 'user_id', 'app_id', 'cluster_id' ) FLAG_CONTENT_TYPE = (1 << 15) diff --git a/trio_amqp/envelope.py b/trio_amqp/envelope.py index 64a2cdb..e097b68 100644 --- a/trio_amqp/envelope.py +++ b/trio_amqp/envelope.py @@ -5,15 +5,9 @@ class Envelope: """Class for basic deliver message fields""" - __slots__ = ( - 'consumer_tag', 'delivery_tag', 'exchange_name', 'routing_key', - 'is_redeliver' - ) + __slots__ = ('consumer_tag', 'delivery_tag', 'exchange_name', 'routing_key', 'is_redeliver') - def __init__( - self, consumer_tag, delivery_tag, exchange_name, routing_key, - is_redeliver - ): + def __init__(self, consumer_tag, delivery_tag, exchange_name, routing_key, is_redeliver): self.consumer_tag = consumer_tag self.delivery_tag = delivery_tag self.exchange_name = exchange_name diff --git a/trio_amqp/exceptions.py b/trio_amqp/exceptions.py index 4847c7e..f2c141d 100644 --- a/trio_amqp/exceptions.py +++ b/trio_amqp/exceptions.py @@ -43,10 +43,7 @@ def __init__(self, code=0, message='Channel is closed'): class DuplicateConsumerTag(TrioAmqpException): def __repr__(self): - return ( - 'The consumer tag specified already exists for this ' - 'channel: %s' % self.args[0] - ) + return ('The consumer tag specified already exists for this ' 'channel: %s' % self.args[0]) class ConsumerCancelled(TrioAmqpException): diff --git a/trio_amqp/frame.py b/trio_amqp/frame.py index 757f083..1157dae 100644 --- a/trio_amqp/frame.py +++ b/trio_amqp/frame.py @@ -110,9 +110,7 @@ def write_value(self, value): def write_bits(self, *args): """Write consecutive bools to one byte""" - assert len( - args - ) <= 8, "write_bits can only write 8 bits into one octet, sadly" + assert len(args) <= 8, "write_bits can only write 8 bits into one octet, sadly" byte_value = 0 for arg_index, bit in enumerate(args): @@ -154,10 +152,7 @@ def write_timestamp(self, value): representing seconds since the Unix epoch. """ self.payload.write( - struct.pack( - '>Q', - int(value.replace(tzinfo=datetime.timezone.utc).timestamp()) - ) + struct.pack('>Q', int(value.replace(tzinfo=datetime.timezone.utc).timestamp())) ) def _write_string(self, string): @@ -319,9 +314,7 @@ def read_longstr(self): return data.decode() def read_timestamp(self): - return datetime.datetime.fromtimestamp( - self.read_long_long(), datetime.timezone.utc - ) + return datetime.datetime.fromtimestamp(self.read_long_long(), datetime.timezone.utc) def read_table(self): """Reads an AMQP table""" @@ -405,9 +398,7 @@ def get_frame(self, encoder): if self.frame_type == amqp_constants.TYPE_METHOD: content_header = struct.pack('!HH', self.class_id, self.method_id) elif self.frame_type == amqp_constants.TYPE_HEADER: - content_header = struct.pack( - '!HHQ', self.class_id, self.weight, self.next_body_size - ) + content_header = struct.pack('!HHQ', self.class_id, self.weight, self.next_body_size) elif self.frame_type == amqp_constants.TYPE_BODY: # no specific headers pass @@ -415,9 +406,7 @@ def get_frame(self, encoder): # no specific headers pass else: - raise Exception( - "frame_type {} not handled".format(self.frame_type) - ) + raise Exception("frame_type {} not handled".format(self.frame_type)) header = struct.pack( '!BHI', self.frame_type, self.channel, @@ -503,47 +492,33 @@ async def read_frame(self): break decoded_properties = {} if self.property_flags & amqp_constants.FLAG_CONTENT_TYPE: - decoded_properties['content_type' - ] = self.payload_decoder.read_shortstr() + decoded_properties['content_type'] = self.payload_decoder.read_shortstr() if self.property_flags & amqp_constants.FLAG_CONTENT_ENCODING: - decoded_properties['content_encoding' - ] = self.payload_decoder.read_shortstr() + decoded_properties['content_encoding'] = self.payload_decoder.read_shortstr() if self.property_flags & amqp_constants.FLAG_HEADERS: - decoded_properties['headers' - ] = self.payload_decoder.read_table() + decoded_properties['headers'] = self.payload_decoder.read_table() if self.property_flags & amqp_constants.FLAG_DELIVERY_MODE: - decoded_properties['delivery_mode' - ] = self.payload_decoder.read_octet() + decoded_properties['delivery_mode'] = self.payload_decoder.read_octet() if self.property_flags & amqp_constants.FLAG_PRIORITY: - decoded_properties['priority' - ] = self.payload_decoder.read_octet() + decoded_properties['priority'] = self.payload_decoder.read_octet() if self.property_flags & amqp_constants.FLAG_CORRELATION_ID: - decoded_properties['correlation_id' - ] = self.payload_decoder.read_shortstr() + decoded_properties['correlation_id'] = self.payload_decoder.read_shortstr() if self.property_flags & amqp_constants.FLAG_REPLY_TO: - decoded_properties['reply_to' - ] = self.payload_decoder.read_shortstr() + decoded_properties['reply_to'] = self.payload_decoder.read_shortstr() if self.property_flags & amqp_constants.FLAG_EXPIRATION: - decoded_properties['expiration' - ] = self.payload_decoder.read_shortstr() + decoded_properties['expiration'] = self.payload_decoder.read_shortstr() if self.property_flags & amqp_constants.FLAG_MESSAGE_ID: - decoded_properties['message_id' - ] = self.payload_decoder.read_shortstr() + decoded_properties['message_id'] = self.payload_decoder.read_shortstr() if self.property_flags & amqp_constants.FLAG_TIMESTAMP: - decoded_properties['timestamp' - ] = self.payload_decoder.read_long_long() + decoded_properties['timestamp'] = self.payload_decoder.read_long_long() if self.property_flags & amqp_constants.FLAG_TYPE: - decoded_properties['type' - ] = self.payload_decoder.read_shortstr() + decoded_properties['type'] = self.payload_decoder.read_shortstr() if self.property_flags & amqp_constants.FLAG_USER_ID: - decoded_properties['user_id' - ] = self.payload_decoder.read_shortstr() + decoded_properties['user_id'] = self.payload_decoder.read_shortstr() if self.property_flags & amqp_constants.FLAG_APP_ID: - decoded_properties['app_id' - ] = self.payload_decoder.read_shortstr() + decoded_properties['app_id'] = self.payload_decoder.read_shortstr() if self.property_flags & amqp_constants.FLAG_CLUSTER_ID: - decoded_properties['cluster_id' - ] = self.payload_decoder.read_shortstr() + decoded_properties['cluster_id'] = self.payload_decoder.read_shortstr() self.properties = Properties(**decoded_properties) elif self.frame_type == amqp_constants.TYPE_BODY: @@ -553,9 +528,7 @@ async def read_frame(self): pass else: - raise ValueError( - "Message type {:x} not known".format(self.frame_type) - ) + raise ValueError("Message type {:x} not known".format(self.frame_type)) self.frame_end = await self._readexactly(1) assert self.frame_end == amqp_constants.FRAME_END diff --git a/trio_amqp/protocol.py b/trio_amqp/protocol.py index 8036f44..1173e35 100644 --- a/trio_amqp/protocol.py +++ b/trio_amqp/protocol.py @@ -128,10 +128,7 @@ def __init__( if login_method != 'AMQPLAIN': # TODO - logger.warning( - 'only AMQPLAIN login_method is supported, ' - 'falling back to AMQPLAIN' - ) + logger.warning('only AMQPLAIN login_method is supported, ' 'falling back to AMQPLAIN') self._host = host self._port = port @@ -219,8 +216,7 @@ async def aclose(self, no_wait=False): # If the closing handshake is in progress, let it complete. frame = amqp_frame.AmqpRequest(amqp_constants.TYPE_METHOD, 0) frame.declare_method( - amqp_constants.CLASS_CONNECTION, - amqp_constants.CONNECTION_CLOSE + amqp_constants.CLASS_CONNECTION, amqp_constants.CONNECTION_CLOSE ) encoder = amqp_frame.AmqpEncoder() # we request a clean connection close @@ -292,9 +288,7 @@ async def __aenter__(self): port = 5672 if self._ssl: - stream = await trio.open_ssl_over_tcp_stream( - self._host, port, ssl_context=ssl_context - ) + stream = await trio.open_ssl_over_tcp_stream(self._host, port, ssl_context=ssl_context) sock = stream.transport_stream else: sock = stream = await trio.open_tcp_stream(self._host, port) @@ -315,11 +309,10 @@ async def __aenter__(self): raise RuntimeError("Server didn't start with a START packet") client_properties = { - 'capabilities': - { - 'consumer_cancel_notify': True, - 'connection.blocked': False, - }, + 'capabilities': { + 'consumer_cancel_notify': True, + 'connection.blocked': False, + }, 'copyright': 'BSD', 'product': _version.__package__, 'product_version': _version.__version__, @@ -327,10 +320,7 @@ async def __aenter__(self): client_properties.update(self.client_properties) # waiting reply start with credentions and co - await self.start_ok( - client_properties, 'AMQPLAIN', self._auth, - self.server_locales[0] - ) + await self.start_ok(client_properties, 'AMQPLAIN', self._auth, self.server_locales[0]) # wait for a "tune" reponse await self.dispatch_frame() @@ -339,17 +329,11 @@ async def __aenter__(self): tune_ok = { 'channel_max': - self.connection_tunning.get( - 'channel_max', self.server_channel_max - ), + self.connection_tunning.get('channel_max', self.server_channel_max), 'frame_max': - self.connection_tunning.get( - 'frame_max', self.server_frame_max - ), + self.connection_tunning.get('frame_max', self.server_frame_max), 'heartbeat': - self.connection_tunning.get( - 'heartbeat', self.server_heartbeat - ), + self.connection_tunning.get('heartbeat', self.server_heartbeat), } # "tune" the connexion with max channel, max frame, heartbeat await self.tune_ok(**tune_ok) @@ -360,9 +344,7 @@ async def __aenter__(self): self.server_heartbeat = tune_ok['heartbeat'] # open a virtualhost - await self.open( - self._virtualhost, capabilities='', insist=self._insist - ) + await self.open(self._virtualhost, capabilities='', insist=self._insist) # wait for open-ok await self.dispatch_frame() @@ -428,9 +410,7 @@ async def dispatch_frame(self, frame=None): return if (frame.class_id, frame.method_id) not in method_dispatch: - logger.info( - "frame %s %s is not handled", frame.class_id, frame.method_id - ) + logger.info("frame %s %s is not handled", frame.class_id, frame.method_id) return await method_dispatch[(frame.class_id, frame.method_id)](frame) @@ -444,9 +424,7 @@ def release_channel_id(self, channel_id): def channels_ids_count(self): return self.channels_ids_ceil - len(self.channels_ids_free) - def _close_channels( - self, reply_code=None, reply_text=None, exception=None - ): + def _close_channels(self, reply_code=None, reply_text=None, exception=None): """Cleanly close channels Args: @@ -522,9 +500,7 @@ async def start(self, frame): async def start_ok(self, client_properties, mechanism, auth, locale): frame = amqp_frame.AmqpRequest(amqp_constants.TYPE_METHOD, 0) - frame.declare_method( - amqp_constants.CLASS_CONNECTION, amqp_constants.CONNECTION_START_OK - ) + frame.declare_method(amqp_constants.CLASS_CONNECTION, amqp_constants.CONNECTION_START_OK) request = amqp_frame.AmqpEncoder() request.write_table(client_properties) request.write_shortstr(mechanism) @@ -541,17 +517,15 @@ async def server_close(self, frame): class_id = response.read_short() method_id = response.read_short() logger.warning( - "Server closed connection: %s, code=%s, class_id=%s, method_id=%s", - reply_text, reply_code, class_id, method_id + "Server closed connection: %s, code=%s, class_id=%s, method_id=%s", reply_text, + reply_code, class_id, method_id ) self._close_channels(reply_code, reply_text) await self._close_ok() async def _close_ok(self): frame = amqp_frame.AmqpRequest(amqp_constants.TYPE_METHOD, 0) - frame.declare_method( - amqp_constants.CLASS_CONNECTION, amqp_constants.CONNECTION_CLOSE_OK - ) + frame.declare_method(amqp_constants.CLASS_CONNECTION, amqp_constants.CONNECTION_CLOSE_OK) request = amqp_frame.AmqpEncoder() await self._write_frame(frame, request) await trio.sleep(0) # give the write task one shot to send the frame @@ -574,9 +548,7 @@ async def tune(self, frame): async def tune_ok(self, channel_max, frame_max, heartbeat): frame = amqp_frame.AmqpRequest(amqp_constants.TYPE_METHOD, 0) - frame.declare_method( - amqp_constants.CLASS_CONNECTION, amqp_constants.CONNECTION_TUNE_OK - ) + frame.declare_method(amqp_constants.CLASS_CONNECTION, amqp_constants.CONNECTION_TUNE_OK) encoder = amqp_frame.AmqpEncoder() encoder.write_short(channel_max) encoder.write_long(frame_max) @@ -590,9 +562,7 @@ async def secure_ok(self, login_response): async def open(self, virtual_host, capabilities='', insist=False): """Open connection to virtual host.""" frame = amqp_frame.AmqpRequest(amqp_constants.TYPE_METHOD, 0) - frame.declare_method( - amqp_constants.CLASS_CONNECTION, amqp_constants.CONNECTION_OPEN - ) + frame.declare_method(amqp_constants.CLASS_CONNECTION, amqp_constants.CONNECTION_OPEN) encoder = amqp_frame.AmqpEncoder() encoder.write_shortstr(virtual_host) encoder.write_shortstr(capabilities)