st2sensorcontainer behaves differently in Docker vs standalone #5743
-
SummaryI'm trying to get the In troubleshooting I launched a reference install using the one-liner install on ubuntu20.04, and replicated the docker environment using st2-docker As i dove into the debug logs for the sensor, I noticed some large variations in behavior. The sensor works perfectly in standalone, but in docker the sensor loads and doesn't provide any errors, but it never progresses to actually interacting with RabbitMQ. I am posting this incase someone has ideas or has ran into this issue, while i continue to troubleshoot. I will update as I make progress. Replication detailsWhile im trying to get this all working in Nomad, I am using https://github.com/StackStorm/st2-docker as a test platform for docker, to enable better replication of the issue. Pack Config---
sensor_config:
host: <host>
username: <user>
password: <pass>
rabbitmq_queue_sensor:
queues:
- platform_st2_jobs
deserialization_method: json Dockergit clone https://github.com/StackStorm/st2-docker
cd st2-docker
docker-compose up -d
docker-compose exec st2client bash
sudo sh -c "cat <<EOF >> /opt/stackstorm/configs/rabbitmq.yaml
---
sensor_config:
host: rabbitmq:5672
username: guest
password: guest
rabbitmq_queue_sensor:
queues:
- platform_st2_jobs
deserialization_method: json
EOF"
export ST2_AUTH_TOKEN=$(st2 auth -t st2admin)
Password:
st2 pack install rabbitmq
<wait for install to finish>
exit
docker-compose exec st2sensorcontainer bash
sudo /opt/stackstorm/st2/bin/st2sensorcontainer --config-file=/etc/st2/st2.conf --config-file=/etc/st2/st2.docker.conf --config-file=/etc/st2/st2.user.conf --debug --sensor-ref=rabbitmq.RabbitMQQueueSensor (Logs located in logs section) Reference Installbash <(curl -sSL https://stackstorm.com/packages/install.sh) --user=st2admin --password=Ch@ngeMe
<wait for install to finish>
rabbit_pw=$(cat /etc/st2/st2.conf | grep amqp | awk '{print $3}' | cut -d: -f 3 | cut -d @ -f 1)
sudo sh -c "cat <<EOF >> /opt/stackstorm/configs/rabbitmq.yaml
---
sensor_config:
host: localhost:5672
username: stackstorm
password: $rabbit_pw
rabbitmq_queue_sensor:
queues:
- platform_st2_jobs
deserialization_method: json
EOF"
export ST2_AUTH_TOKEN=$(st2 auth -t st2admin)
Password:
st2 pack install rabbitmq
<wait for install to finish>
sudo /opt/stackstorm/st2/bin/st2sensorcontainer --config-file=/etc/st2/st2.conf --debug --sensor-ref=rabbitmq.RabbitMQQueueSensor (Logs located in logs section) Interesting LogsPython VersionDocker
Stand Alone
Major Variation 1Standalone actually has a log indicating AMQP connection Docker Sensor2022-09-22 23:36:33,286 INFO [-] Running sensor initialization code
2022-09-22 23:36:33,286 DEBUG [-] Using PollPoller
2022-09-22 23:36:33,291 DEBUG [-] Created default connection workflow <pika.adapters.utils.connection_workflow.AMQPConnectionWorkflow object at 0x7f0a3bf03518>
2022-09-22 23:36:33,292 DEBUG [-] Starting AMQP Connection workflow asynchronously.
2022-09-22 23:36:33,292 DEBUG [-] call_later: added timeout <pika.adapters.select_connection._Timeout object at 0x7f0a3bf03550> with deadline=28202.522130639 and callback=functools.partial(<bound method AMQPConnectionWorkflow._start_new_cycle_async of <pika.adapters.utils.connection_workflow.AMQPConnectionWorkflow object at 0x7f0a3bf03518>>, first=True); now=28202.522130639; delay=0
2022-09-22 23:36:33,292 DEBUG [-] Beginning a new AMQP connection workflow cycle; attempts remaining after this: 0
2022-09-22 23:36:33,292 DEBUG [-] call_later: added timeout <pika.adapters.select_connection._Timeout object at 0x7f0a3bf03588> with deadline=28202.522581713 and callback=<bound method AMQPConnectionWorkflow._try_next_config_async of <pika.adapters.utils.connection_workflow.AMQPConnectionWorkflow object at 0x7f0a3bf03518>>; now=28202.522581713; delay=0
2022-09-22 23:36:33,292 DEBUG [-] _try_next_config_async: 'rabbitmq:5672':5672
2022-09-22 23:36:33,295 DEBUG [-] Found existing trigger: TriggerDB(description=None, id=632cf094a814c8713a1b0287, metadata_file=None, name="new_message", pack="rabbitmq", parameters={}, ref="rabbitmq.new_message", ref_count=0, type="rabbitmq.new_message", uid="trigger:rabbitmq:new_message:99914b932bd37a50b983c5e7c90ae93b") in db.
2022-09-22 23:36:33,295 DEBUG [-] Calling sensor "add_trigger" method (trigger.type=rabbitmq.new_message) Standalone Sensor2022-09-22 23:52:03,650 INFO [-] Running sensor initialization code
2022-09-22 23:52:03,650 DEBUG [-] Using PollPoller
2022-09-22 23:52:03,658 DEBUG [-] Created default connection workflow <pika.adapters.utils.connection_workflow.AMQPConnectionWorkflow object at 0x7fabdd9a4520>
2022-09-22 23:52:03,659 DEBUG [-] Starting AMQP Connection workflow asynchronously.
2022-09-22 23:52:03,659 DEBUG [-] call_later: added timeout <pika.adapters.select_connection._Timeout object at 0x7fabdd9a4550> with deadline=10300.86587002 and callback=functools.partial(<bound method AMQPConnectionWorkflow._start_new_cycle_async of <pika.adapters.utils.connection_workflow.AMQPConnectionWorkflow object at 0x7fabdd9a4520>>, first=True); now=10300.86587002; delay=0
2022-09-22 23:52:03,659 DEBUG [-] Beginning a new AMQP connection workflow cycle; attempts remaining after this: 0
2022-09-22 23:52:03,660 DEBUG [-] call_later: added timeout <pika.adapters.select_connection._Timeout object at 0x7fabdd9a4580> with deadline=10300.866654604 and callback=<bound method AMQPConnectionWorkflow._try_next_config_async of <pika.adapters.utils.connection_workflow.AMQPConnectionWorkflow object at 0x7fabdd9a4520>>; now=10300.866654604; delay=0
2022-09-22 23:52:03,660 DEBUG [-] _try_next_config_async: 'localhost':5672
2022-09-22 23:52:03,661 DEBUG [-] add_callback_threadsafe: added callback=<bound method _AddressResolver._dispatch_result of <pika.adapters.utils.selector_ioloop_adapter._AddressResolver object at 0x7fabdd9a45b0>>
2022-09-22 23:52:03,661 DEBUG [-] Start from server, version: 0.9, properties: {'capabilities': {'publisher_confirms': True, 'exchange_exchange_bindings': True, 'basic.nack': True, 'consumer_cancel_notify': True, 'connection.blocked': True, 'consumer_priorities': True, 'authentication_failure_close': True, 'per_consumer_qos': True, 'direct_reply_to': True}, 'cluster_name': '[email protected]', 'copyright': 'Copyright (c) 2007-2022 VMware, Inc. or its affiliates.', 'information': 'Licensed under the MPL 2.0. Website: https://rabbitmq.com', 'platform': 'Erlang/OTP 24.3.4.5', 'product': 'RabbitMQ', 'version': '3.10.7'}, mechanisms: [b'PLAIN', b'AMQPLAIN'], locales: ['en_US']
2022-09-22 23:52:03,663 INFO [-] Connected to amqp://stackstorm:**@127.0.0.1:5672//
2022-09-22 23:52:03,663 DEBUG [-] using channel_id: 1
2022-09-22 23:52:03,664 DEBUG [-] Channel open
2022-09-22 23:52:03,667 DEBUG [-] Found existing trigger: TriggerDB(description=None, id=632ccebfea32b3fad805a165, metadata_file=None, name="new_message", pack="rabbitmq", parameters={}, ref="rabbitmq.new_message", ref_count=0, type="rabbitmq.new_message", uid="trigger:rabbitmq:new_message:99914b932bd37a50b983c5e7c90ae93b") in db.
2022-09-22 23:52:03,668 DEBUG [-] Calling sensor "add_trigger" method (trigger.type=rabbitmq.new_message) Major Variation 2After the standalone sensor connects to RabbitMQ, pika goes wild and does lots of things. The docker sensor does... nothing. Docker Sensor
Standalone Sensor
Key indicator of successIn any event where the sensor successfully works, you will see the log message of
This event does not happen in Docker. Next StepsIts clear to me that there is something happening between Stackstorm's sensor wrapper, the sensor code, and most notably I would be open to any suggestions on how to dig deeper into this Full LogsDocker InstanceDocker Sensor
2022-09-22 23:36:31,607 INFO [-] Using Python: 3.6.9 (/opt/stackstorm/st2/bin/python)
2022-09-22 23:36:31,608 INFO [-] Using fs encoding: utf-8, default encoding: utf-8, locale: en_US.UTF-8, LANG env variable: en_US.UTF-8, PYTHONIOENCODING env variable: notset
2022-09-22 23:36:31,608 INFO [-] Using config files: /etc/st2/st2.conf,/etc/st2/st2.docker.conf,/etc/st2/st2.user.conf
2022-09-22 23:36:31,608 INFO [-] Using logging config: /etc/st2/logging.sensorcontainer.conf
2022-09-22 23:36:31,608 INFO [-] Using coordination driver: redis
2022-09-22 23:36:31,608 INFO [-] Using metrics driver: noop
2022-09-22 23:36:31,618 INFO [-] Connecting to database "st2" @ "mongo:27017" as user "None".
2022-09-22 23:36:31,628 INFO [-] Successfully connected to database "st2" @ "mongo:27017" as user "None".
2022-09-22 23:36:31,628 DEBUG [-] Ensuring database indexes...
2022-09-22 23:36:31,724 DEBUG [-] Skipping index cleanup for blacklisted model "PermissionGrantDB"...
2022-09-22 23:36:31,771 DEBUG [-] Indexes are ensured for models: ActionAliasDB, ActionAliasDB, ActionDB, ActionExecutionDB, ActionExecutionDB, ActionExecutionOutputDB, ActionExecutionSchedulingQueueItemDB, ActionExecutionStateDB, ActionExecutionStateDB, ApiKeyDB, ConfigDB, ConfigSchemaDB, GroupToRoleMappingDB, KeyValuePairDB, LiveActionDB, LiveActionDB, PackDB, PermissionGrantDB, PolicyDB, PolicyTypeDB, RoleDB, RuleDB, RuleEnforcementDB, RunnerTypeDB, RunnerTypeDB, SensorTypeDB, TaskExecutionDB, TokenDB, TraceDB, TriggerDB, TriggerInstanceDB, TriggerTypeDB, UserDB, UserRoleAssignmentDB, WorkflowExecutionDB
2022-09-22 23:36:31,771 DEBUG [-] Registering exchanges...
2022-09-22 23:36:31,772 DEBUG [-] Using SSL context for RabbitMQ connection: {}
2022-09-22 23:36:31,783 DEBUG [-] Start from server, version: 0.9, properties: {'capabilities': {'publisher_confirms': True, 'exchange_exchange_bindings': True, 'basic.nack': True, 'consumer_cancel_notify': True, 'connection.blocked': True, 'consumer_priorities': True, 'authentication_failure_close': True, 'per_consumer_qos': True, 'direct_reply_to': True}, 'cluster_name': 'rabbit@ccace9b3bfbf', 'copyright': 'Copyright (c) 2007-2022 VMware, Inc. or its affiliates.', 'information': 'Licensed under the MPL 2.0. Website: https://rabbitmq.com', 'platform': 'Erlang/OTP 24.3.3', 'product': 'RabbitMQ', 'version': '3.8.30'}, mechanisms: [b'AMQPLAIN', b'PLAIN'], locales: ['en_US']
2022-09-22 23:36:31,785 DEBUG [-] using channel_id: 1
2022-09-22 23:36:31,786 DEBUG [-] Channel open
2022-09-22 23:36:31,786 DEBUG [-] Registered exchange st2.actionexecutionstate ({'exchange': 'st2.actionexecutionstate', 'type': 'topic', 'durable': True, 'auto_delete': False, 'arguments': None, 'nowait': False, 'passive': False}).
2022-09-22 23:36:31,787 DEBUG [-] Registered exchange st2.announcement ({'exchange': 'st2.announcement', 'type': 'topic', 'durable': True, 'auto_delete': False, 'arguments': None, 'nowait': False, 'passive': False}).
2022-09-22 23:36:31,787 DEBUG [-] Registered exchange st2.execution ({'exchange': 'st2.execution', 'type': 'topic', 'durable': True, 'auto_delete': False, 'arguments': None, 'nowait': False, 'passive': False}).
2022-09-22 23:36:31,788 DEBUG [-] Registered exchange st2.liveaction ({'exchange': 'st2.liveaction', 'type': 'topic', 'durable': True, 'auto_delete': False, 'arguments': None, 'nowait': False, 'passive': False}).
2022-09-22 23:36:31,788 DEBUG [-] Registered exchange st2.liveaction.status ({'exchange': 'st2.liveaction.status', 'type': 'topic', 'durable': True, 'auto_delete': False, 'arguments': None, 'nowait': False, 'passive': False}).
2022-09-22 23:36:31,789 DEBUG [-] Registered exchange st2.trigger ({'exchange': 'st2.trigger', 'type': 'topic', 'durable': True, 'auto_delete': False, 'arguments': None, 'nowait': False, 'passive': False}).
2022-09-22 23:36:31,789 DEBUG [-] Registered exchange st2.trigger_instances_dispatch ({'exchange': 'st2.trigger_instances_dispatch', 'type': 'topic', 'durable': True, 'auto_delete': False, 'arguments': None, 'nowait': False, 'passive': False}).
2022-09-22 23:36:31,790 DEBUG [-] Registered exchange st2.sensor ({'exchange': 'st2.sensor', 'type': 'topic', 'durable': True, 'auto_delete': False, 'arguments': None, 'nowait': False, 'passive': False}).
2022-09-22 23:36:31,790 DEBUG [-] Registered exchange st2.workflow ({'exchange': 'st2.workflow', 'type': 'topic', 'durable': True, 'auto_delete': False, 'arguments': None, 'nowait': False, 'passive': False}).
2022-09-22 23:36:31,791 DEBUG [-] Registered exchange st2.workflow.status ({'exchange': 'st2.workflow.status', 'type': 'topic', 'durable': True, 'auto_delete': False, 'arguments': None, 'nowait': False, 'passive': False}).
2022-09-22 23:36:31,791 DEBUG [-] Closed channel #1
2022-09-22 23:36:31,791 DEBUG [-] using channel_id: 1
2022-09-22 23:36:31,792 DEBUG [-] Channel open
2022-09-22 23:36:31,792 DEBUG [-] Predeclaring queue for exchange "st2.liveaction.status"
2022-09-22 23:36:31,794 DEBUG [-] Predeclared queue for exchange "st2.liveaction.status"
2022-09-22 23:36:31,794 DEBUG [-] Predeclaring queue for exchange "st2.liveaction.status"
2022-09-22 23:36:31,795 DEBUG [-] Predeclared queue for exchange "st2.liveaction.status"
2022-09-22 23:36:31,795 DEBUG [-] Predeclaring queue for exchange "st2.liveaction.status"
2022-09-22 23:36:31,797 DEBUG [-] Predeclared queue for exchange "st2.liveaction.status"
2022-09-22 23:36:31,797 DEBUG [-] Predeclaring queue for exchange "st2.execution"
2022-09-22 23:36:31,798 DEBUG [-] Predeclared queue for exchange "st2.execution"
2022-09-22 23:36:31,798 DEBUG [-] Predeclaring queue for exchange "st2.actionexecutionstate"
2022-09-22 23:36:31,799 DEBUG [-] Predeclared queue for exchange "st2.actionexecutionstate"
2022-09-22 23:36:31,800 DEBUG [-] Predeclaring queue for exchange "st2.trigger_instances_dispatch"
2022-09-22 23:36:31,801 DEBUG [-] Predeclared queue for exchange "st2.trigger_instances_dispatch"
2022-09-22 23:36:31,801 DEBUG [-] Predeclaring queue for exchange "st2.announcement"
2022-09-22 23:36:31,806 DEBUG [-] Predeclared queue for exchange "st2.announcement"
2022-09-22 23:36:31,806 DEBUG [-] Predeclaring queue for exchange "st2.execution"
2022-09-22 23:36:31,811 DEBUG [-] Predeclared queue for exchange "st2.execution"
2022-09-22 23:36:31,811 DEBUG [-] Predeclaring queue for exchange "st2.liveaction"
2022-09-22 23:36:31,816 DEBUG [-] Predeclared queue for exchange "st2.liveaction"
2022-09-22 23:36:31,816 DEBUG [-] Predeclaring queue for exchange "st2.execution.output"
2022-09-22 23:36:31,821 DEBUG [-] Predeclared queue for exchange "st2.execution.output"
2022-09-22 23:36:31,821 DEBUG [-] Predeclaring queue for exchange "st2.workflow.status"
2022-09-22 23:36:31,823 DEBUG [-] Predeclared queue for exchange "st2.workflow.status"
2022-09-22 23:36:31,823 DEBUG [-] Predeclaring queue for exchange "st2.workflow.status"
2022-09-22 23:36:31,825 DEBUG [-] Predeclared queue for exchange "st2.workflow.status"
2022-09-22 23:36:31,826 DEBUG [-] Predeclaring queue for exchange "st2.trigger"
2022-09-22 23:36:31,828 DEBUG [-] Predeclared queue for exchange "st2.trigger"
2022-09-22 23:36:31,828 DEBUG [-] Predeclaring queue for exchange "st2.sensor"
2022-09-22 23:36:31,831 DEBUG [-] Predeclared queue for exchange "st2.sensor"
2022-09-22 23:36:31,832 DEBUG [-] Closed channel #1
2022-09-22 23:36:31,837 DEBUG [-] Inserting system roles (['system_admin', 'admin', 'observer'])
2022-09-22 23:36:31,862 DEBUG [-] found extension EntryPoint.parse('echo = st2common.metrics.drivers.echo_driver:EchoDriver')
2022-09-22 23:36:31,863 DEBUG [-] found extension EntryPoint.parse('noop = st2common.metrics.drivers.noop_driver:NoopDriver')
2022-09-22 23:36:31,864 DEBUG [-] found extension EntryPoint.parse('statsd = st2common.metrics.drivers.statsd_driver:StatsdDriver')
2022-09-22 23:36:31,864 INFO [-] Running in single sensor mode, using a single sensor partitioner...
2022-09-22 23:36:31,866 INFO [-] Setting up container to run 1 sensors.
2022-09-22 23:36:31,866 INFO [-] Sensors list - ['rabbitmq.RabbitMQQueueSensor'].
2022-09-22 23:36:31,867 INFO [-] (PID:36) SensorContainer started.
2022-09-22 23:36:31,867 DEBUG [-] Using SSL context for RabbitMQ connection: {}
2022-09-22 23:36:31,867 DEBUG [-] Starting sensor CUD watcher...
2022-09-22 23:36:31,867 DEBUG [-] Using SSL context for RabbitMQ connection: {}
2022-09-22 23:36:31,867 INFO [-] Running sensor rabbitmq.RabbitMQQueueSensor
2022-09-22 23:36:31,868 DEBUG [-] Creating temporary auth token for sensor RabbitMQQueueSensor
2022-09-22 23:36:31,875 DEBUG [-] Start from server, version: 0.9, properties: {'capabilities': {'publisher_confirms': True, 'exchange_exchange_bindings': True, 'basic.nack': True, 'consumer_cancel_notify': True, 'connection.blocked': True, 'consumer_priorities': True, 'authentication_failure_close': True, 'per_consumer_qos': True, 'direct_reply_to': True}, 'cluster_name': 'rabbit@ccace9b3bfbf', 'copyright': 'Copyright (c) 2007-2022 VMware, Inc. or its affiliates.', 'information': 'Licensed under the MPL 2.0. Website: https://rabbitmq.com', 'platform': 'Erlang/OTP 24.3.3', 'product': 'RabbitMQ', 'version': '3.8.30'}, mechanisms: [b'AMQPLAIN', b'PLAIN'], locales: ['en_US']
2022-09-22 23:36:31,876 AUDIT [-] Access granted to "sensors_container" with the token set to expire at "2022-09-23T23:36:31.874959Z". (username='sensors_container',token_expiration='2022-09-23T23:36:31.874959Z')
2022-09-22 23:36:31,877 DEBUG [-] Running sensor subprocess (cmd="/opt/stackstorm/virtualenvs/rabbitmq/bin/python /opt/stackstorm/st2/lib/python3.6/site-packages/st2reactor/container/sensor_wrapper.py --pack=rabbitmq --file-path=/opt/stackstorm/packs/rabbitmq/sensors/queues_sensor.py --class-name=RabbitMQQueueSensor --trigger-type-refs=rabbitmq.new_message --parent-args=["--config-file=/etc/st2/st2.conf", "--config-file=/etc/st2/st2.docker.conf", "--config-file=/etc/st2/st2.user.conf", "--debug", "--sensor-ref=rabbitmq.RabbitMQQueueSensor"]")
2022-09-22 23:36:31,882 DEBUG [-] Dispatching trigger (trigger=core.st2.sensor.process_spawn,payload={'trigger': 'core.st2.sensor.process_spawn', 'payload': {'id': 'RabbitMQQueueSensor', 'timestamp': 1663889791, 'pid': 40, 'cmd': '/opt/stackstorm/virtualenvs/rabbitmq/bin/python /opt/stackstorm/st2/lib/python3.6/site-packages/st2reactor/container/sensor_wrapper.py --pack=rabbitmq --file-path=/opt/stackstorm/packs/rabbitmq/sensors/queues_sensor.py --class-name=RabbitMQQueueSensor --trigger-type-refs=rabbitmq.new_message --parent-args=["--config-file=/etc/st2/st2.conf", "--config-file=/etc/st2/st2.docker.conf", "--config-file=/etc/st2/st2.user.conf", "--debug", "--sensor-ref=rabbitmq.RabbitMQQueueSensor"]'}, 'trace_context': None})
2022-09-22 23:36:31,887 INFO [-] Connected to amqp://guest:**@rabbitmq:5672//
2022-09-22 23:36:31,887 DEBUG [-] using channel_id: 1
2022-09-22 23:36:31,889 DEBUG [-] Channel open
2022-09-22 23:36:31,890 DEBUG [-] Start from server, version: 0.9, properties: {'capabilities': {'publisher_confirms': True, 'exchange_exchange_bindings': True, 'basic.nack': True, 'consumer_cancel_notify': True, 'connection.blocked': True, 'consumer_priorities': True, 'authentication_failure_close': True, 'per_consumer_qos': True, 'direct_reply_to': True}, 'cluster_name': 'rabbit@ccace9b3bfbf', 'copyright': 'Copyright (c) 2007-2022 VMware, Inc. or its affiliates.', 'information': 'Licensed under the MPL 2.0. Website: https://rabbitmq.com', 'platform': 'Erlang/OTP 24.3.3', 'product': 'RabbitMQ', 'version': '3.8.30'}, mechanisms: [b'AMQPLAIN', b'PLAIN'], locales: ['en_US']
2022-09-22 23:36:31,891 DEBUG [-] using channel_id: 1
2022-09-22 23:36:31,892 DEBUG [-] Channel open
2022-09-22 23:36:31,894 DEBUG [-] Closed channel #1
2022-09-22 23:36:31,894 INFO [-] Sensor rabbitmq.RabbitMQQueueSensor started
2022-09-22 23:36:31,894 DEBUG [-] 1 active sensor(s)
2022-09-22 23:36:33,275 DEBUG [-] Using SSL context for RabbitMQ connection: {}
2022-09-22 23:36:33,285 INFO [-] Found config for sensor "RabbitMQQueueSensor"
2022-09-22 23:36:33,285 DEBUG [-] Using SSL context for RabbitMQ connection: {}
2022-09-22 23:36:33,286 INFO [-] Watcher started
2022-09-22 23:36:33,286 INFO [-] Running sensor initialization code
2022-09-22 23:36:33,286 DEBUG [-] Using PollPoller
2022-09-22 23:36:33,291 DEBUG [-] Created default connection workflow <pika.adapters.utils.connection_workflow.AMQPConnectionWorkflow object at 0x7f0a3bf03518>
2022-09-22 23:36:33,292 DEBUG [-] Starting AMQP Connection workflow asynchronously.
2022-09-22 23:36:33,292 DEBUG [-] call_later: added timeout <pika.adapters.select_connection._Timeout object at 0x7f0a3bf03550> with deadline=28202.522130639 and callback=functools.partial(<bound method AMQPConnectionWorkflow._start_new_cycle_async of <pika.adapters.utils.connection_workflow.AMQPConnectionWorkflow object at 0x7f0a3bf03518>>, first=True); now=28202.522130639; delay=0
2022-09-22 23:36:33,292 DEBUG [-] Beginning a new AMQP connection workflow cycle; attempts remaining after this: 0
2022-09-22 23:36:33,292 DEBUG [-] call_later: added timeout <pika.adapters.select_connection._Timeout object at 0x7f0a3bf03588> with deadline=28202.522581713 and callback=<bound method AMQPConnectionWorkflow._try_next_config_async of <pika.adapters.utils.connection_workflow.AMQPConnectionWorkflow object at 0x7f0a3bf03518>>; now=28202.522581713; delay=0
2022-09-22 23:36:33,292 DEBUG [-] _try_next_config_async: 'rabbitmq:5672':5672
2022-09-22 23:36:33,295 DEBUG [-] Found existing trigger: TriggerDB(description=None, id=632cf094a814c8713a1b0287, metadata_file=None, name="new_message", pack="rabbitmq", parameters={}, ref="rabbitmq.new_message", ref_count=0, type="rabbitmq.new_message", uid="trigger:rabbitmq:new_message:99914b932bd37a50b983c5e7c90ae93b") in db.
2022-09-22 23:36:33,295 DEBUG [-] Calling sensor "add_trigger" method (trigger.type=rabbitmq.new_message)
2022-09-22 23:36:36,895 DEBUG [-] 1 active sensor(s)
2022-09-22 23:36:41,897 DEBUG [-] 1 active sensor(s)
2022-09-22 23:36:46,898 DEBUG [-] 1 active sensor(s)
2022-09-22 23:36:51,877 DEBUG [-] 1 active sensor(s)
2022-09-22 23:36:56,878 DEBUG [-] 1 active sensor(s)
2022-09-22 23:37:01,880 DEBUG [-] 1 active sensor(s)
2022-09-22 23:37:06,881 DEBUG [-] 1 active sensor(s)
^C2022-09-22 23:37:11,599 INFO [-] Container shutting down. Invoking cleanup on sensors.
2022-09-22 23:37:11,881 ERROR [-] Process container stopped.
2022-09-22 23:37:12,602 INFO [-] All sensors are shut down.
2022-09-22 23:37:12,603 DEBUG [-] Shutting down sensor watcher.
2022-09-22 23:37:12,610 DEBUG [-] Closed channel #1
2022-09-22 23:37:12,611 DEBUG [-] Start from server, version: 0.9, properties: {'capabilities': {'publisher_confirms': True, 'exchange_exchange_bindings': True, 'basic.nack': True, 'consumer_cancel_notify': True, 'connection.blocked': True, 'consumer_priorities': True, 'authentication_failure_close': True, 'per_consumer_qos': True, 'direct_reply_to': True}, 'cluster_name': 'rabbit@ccace9b3bfbf', 'copyright': 'Copyright (c) 2007-2022 VMware, Inc. or its affiliates.', 'information': 'Licensed under the MPL 2.0. Website: https://rabbitmq.com', 'platform': 'Erlang/OTP 24.3.3', 'product': 'RabbitMQ', 'version': '3.8.30'}, mechanisms: [b'AMQPLAIN', b'PLAIN'], locales: ['en_US']
2022-09-22 23:37:12,612 DEBUG [-] using channel_id: 1
2022-09-22 23:37:12,614 DEBUG [-] Channel open
2022-09-22 23:37:12,618 DEBUG [-] Closed channel #1
2022-09-22 23:37:12,618 INFO [-] (PID:36) SensorContainer stopped. Reason - KeyboardInterrupt Standalone Ref InstallStandalone Ref Install Sensor
2022-09-22 23:52:02,512 INFO [-] Using Python: 3.8.10 (/opt/stackstorm/st2/bin/python)
2022-09-22 23:52:02,513 INFO [-] Using fs encoding: utf-8, default encoding: utf-8, locale: C.UTF-8, LANG env variable: en_US.UTF-8, PYTHONIOENCODING env variable: notset
2022-09-22 23:52:02,513 INFO [-] Using config files: /etc/st2/st2.conf
2022-09-22 23:52:02,513 INFO [-] Using logging config: /etc/st2/logging.sensorcontainer.conf
2022-09-22 23:52:02,513 INFO [-] Using coordination driver: redis
2022-09-22 23:52:02,514 INFO [-] Using metrics driver: noop
2022-09-22 23:52:02,525 INFO [-] Connecting to database "st2" @ "127.0.0.1:27017" as user "stackstorm".
2022-09-22 23:52:02,531 INFO [-] Successfully connected to database "st2" @ "127.0.0.1:27017" as user "stackstorm".
2022-09-22 23:52:02,531 DEBUG [-] Ensuring database indexes...
2022-09-22 23:52:02,663 DEBUG [-] Skipping index cleanup for blacklisted model "PermissionGrantDB"...
2022-09-22 23:52:02,720 DEBUG [-] Indexes are ensured for models: ActionAliasDB, ActionAliasDB, ActionDB, ActionExecutionDB, ActionExecutionDB, ActionExecutionOutputDB, ActionExecutionSchedulingQueueItemDB, ActionExecutionStateDB, ActionExecutionStateDB, ApiKeyDB, ConfigDB, ConfigSchemaDB, GroupToRoleMappingDB, KeyValuePairDB, LiveActionDB, LiveActionDB, PackDB, PermissionGrantDB, PolicyDB, PolicyTypeDB, RoleDB, RuleDB, RuleEnforcementDB, RunnerTypeDB, RunnerTypeDB, SensorTypeDB, TaskExecutionDB, TokenDB, TraceDB, TriggerDB, TriggerInstanceDB, TriggerTypeDB, UserDB, UserRoleAssignmentDB, WorkflowExecutionDB
2022-09-22 23:52:02,720 DEBUG [-] Registering exchanges...
2022-09-22 23:52:02,721 DEBUG [-] Using SSL context for RabbitMQ connection: {}
2022-09-22 23:52:02,731 DEBUG [-] Start from server, version: 0.9, properties: {'capabilities': {'publisher_confirms': True, 'exchange_exchange_bindings': True, 'basic.nack': True, 'consumer_cancel_notify': True, 'connection.blocked': True, 'consumer_priorities': True, 'authentication_failure_close': True, 'per_consumer_qos': True, 'direct_reply_to': True}, 'cluster_name': '[email protected]', 'copyright': 'Copyright (c) 2007-2022 VMware, Inc. or its affiliates.', 'information': 'Licensed under the MPL 2.0. Website: https://rabbitmq.com', 'platform': 'Erlang/OTP 24.3.4.5', 'product': 'RabbitMQ', 'version': '3.10.7'}, mechanisms: [b'PLAIN', b'AMQPLAIN'], locales: ['en_US']
2022-09-22 23:52:02,732 DEBUG [-] using channel_id: 1
2022-09-22 23:52:02,733 DEBUG [-] Channel open
2022-09-22 23:52:02,735 DEBUG [-] Registered exchange st2.actionexecutionstate ({'exchange': 'st2.actionexecutionstate', 'type': 'topic', 'durable': True, 'auto_delete': False, 'arguments': None, 'nowait': False, 'passive': False}).
2022-09-22 23:52:02,736 DEBUG [-] Registered exchange st2.announcement ({'exchange': 'st2.announcement', 'type': 'topic', 'durable': True, 'auto_delete': False, 'arguments': None, 'nowait': False, 'passive': False}).
2022-09-22 23:52:02,736 DEBUG [-] Registered exchange st2.execution ({'exchange': 'st2.execution', 'type': 'topic', 'durable': True, 'auto_delete': False, 'arguments': None, 'nowait': False, 'passive': False}).
2022-09-22 23:52:02,737 DEBUG [-] Registered exchange st2.liveaction ({'exchange': 'st2.liveaction', 'type': 'topic', 'durable': True, 'auto_delete': False, 'arguments': None, 'nowait': False, 'passive': False}).
2022-09-22 23:52:02,738 DEBUG [-] Registered exchange st2.liveaction.status ({'exchange': 'st2.liveaction.status', 'type': 'topic', 'durable': True, 'auto_delete': False, 'arguments': None, 'nowait': False, 'passive': False}).
2022-09-22 23:52:02,738 DEBUG [-] Registered exchange st2.trigger ({'exchange': 'st2.trigger', 'type': 'topic', 'durable': True, 'auto_delete': False, 'arguments': None, 'nowait': False, 'passive': False}).
2022-09-22 23:52:02,739 DEBUG [-] Registered exchange st2.trigger_instances_dispatch ({'exchange': 'st2.trigger_instances_dispatch', 'type': 'topic', 'durable': True, 'auto_delete': False, 'arguments': None, 'nowait': False, 'passive': False}).
2022-09-22 23:52:02,739 DEBUG [-] Registered exchange st2.sensor ({'exchange': 'st2.sensor', 'type': 'topic', 'durable': True, 'auto_delete': False, 'arguments': None, 'nowait': False, 'passive': False}).
2022-09-22 23:52:02,740 DEBUG [-] Registered exchange st2.workflow ({'exchange': 'st2.workflow', 'type': 'topic', 'durable': True, 'auto_delete': False, 'arguments': None, 'nowait': False, 'passive': False}).
2022-09-22 23:52:02,741 DEBUG [-] Registered exchange st2.workflow.status ({'exchange': 'st2.workflow.status', 'type': 'topic', 'durable': True, 'auto_delete': False, 'arguments': None, 'nowait': False, 'passive': False}).
2022-09-22 23:52:02,742 DEBUG [-] Closed channel #1
2022-09-22 23:52:02,742 DEBUG [-] using channel_id: 1
2022-09-22 23:52:02,743 DEBUG [-] Channel open
2022-09-22 23:52:02,743 DEBUG [-] Predeclaring queue for exchange "st2.liveaction.status"
2022-09-22 23:52:02,745 DEBUG [-] Predeclared queue for exchange "st2.liveaction.status"
2022-09-22 23:52:02,745 DEBUG [-] Predeclaring queue for exchange "st2.liveaction.status"
2022-09-22 23:52:02,747 DEBUG [-] Predeclared queue for exchange "st2.liveaction.status"
2022-09-22 23:52:02,747 DEBUG [-] Predeclaring queue for exchange "st2.liveaction.status"
2022-09-22 23:52:02,749 DEBUG [-] Predeclared queue for exchange "st2.liveaction.status"
2022-09-22 23:52:02,749 DEBUG [-] Predeclaring queue for exchange "st2.execution"
2022-09-22 23:52:02,750 DEBUG [-] Predeclared queue for exchange "st2.execution"
2022-09-22 23:52:02,751 DEBUG [-] Predeclaring queue for exchange "st2.actionexecutionstate"
2022-09-22 23:52:02,752 DEBUG [-] Predeclared queue for exchange "st2.actionexecutionstate"
2022-09-22 23:52:02,752 DEBUG [-] Predeclaring queue for exchange "st2.trigger_instances_dispatch"
2022-09-22 23:52:02,754 DEBUG [-] Predeclared queue for exchange "st2.trigger_instances_dispatch"
2022-09-22 23:52:02,754 DEBUG [-] Predeclaring queue for exchange "st2.announcement"
2022-09-22 23:52:02,758 DEBUG [-] Predeclared queue for exchange "st2.announcement"
2022-09-22 23:52:02,758 DEBUG [-] Predeclaring queue for exchange "st2.execution"
2022-09-22 23:52:02,761 DEBUG [-] Predeclared queue for exchange "st2.execution"
2022-09-22 23:52:02,762 DEBUG [-] Predeclaring queue for exchange "st2.liveaction"
2022-09-22 23:52:02,765 DEBUG [-] Predeclared queue for exchange "st2.liveaction"
2022-09-22 23:52:02,766 DEBUG [-] Predeclaring queue for exchange "st2.execution.output"
2022-09-22 23:52:02,769 DEBUG [-] Predeclared queue for exchange "st2.execution.output"
2022-09-22 23:52:02,769 DEBUG [-] Predeclaring queue for exchange "st2.workflow.status"
2022-09-22 23:52:02,771 DEBUG [-] Predeclared queue for exchange "st2.workflow.status"
2022-09-22 23:52:02,771 DEBUG [-] Predeclaring queue for exchange "st2.workflow.status"
2022-09-22 23:52:02,773 DEBUG [-] Predeclared queue for exchange "st2.workflow.status"
2022-09-22 23:52:02,773 DEBUG [-] Predeclaring queue for exchange "st2.trigger"
2022-09-22 23:52:02,774 DEBUG [-] Predeclared queue for exchange "st2.trigger"
2022-09-22 23:52:02,775 DEBUG [-] Predeclaring queue for exchange "st2.sensor"
2022-09-22 23:52:02,776 DEBUG [-] Predeclared queue for exchange "st2.sensor"
2022-09-22 23:52:02,776 DEBUG [-] Closed channel #1
2022-09-22 23:52:02,785 DEBUG [-] Inserting system roles (['system_admin', 'admin', 'observer'])
2022-09-22 23:52:02,803 DEBUG [-] found extension EntryPoint.parse('echo = st2common.metrics.drivers.echo_driver:EchoDriver')
2022-09-22 23:52:02,804 DEBUG [-] found extension EntryPoint.parse('noop = st2common.metrics.drivers.noop_driver:NoopDriver')
2022-09-22 23:52:02,805 DEBUG [-] found extension EntryPoint.parse('statsd = st2common.metrics.drivers.statsd_driver:StatsdDriver')
2022-09-22 23:52:02,805 INFO [-] Running in single sensor mode, using a single sensor partitioner...
2022-09-22 23:52:02,807 INFO [-] Setting up container to run 1 sensors.
2022-09-22 23:52:02,808 INFO [-] Sensors list - ['rabbitmq.RabbitMQQueueSensor'].
2022-09-22 23:52:02,808 INFO [-] (PID:39632) SensorContainer started.
2022-09-22 23:52:02,808 DEBUG [-] Using SSL context for RabbitMQ connection: {}
2022-09-22 23:52:02,809 DEBUG [-] Starting sensor CUD watcher...
2022-09-22 23:52:02,809 DEBUG [-] Using SSL context for RabbitMQ connection: {}
2022-09-22 23:52:02,810 INFO [-] Running sensor rabbitmq.RabbitMQQueueSensor
2022-09-22 23:52:02,810 DEBUG [-] Creating temporary auth token for sensor RabbitMQQueueSensor
2022-09-22 23:52:02,819 DEBUG [-] Start from server, version: 0.9, properties: {'capabilities': {'publisher_confirms': True, 'exchange_exchange_bindings': True, 'basic.nack': True, 'consumer_cancel_notify': True, 'connection.blocked': True, 'consumer_priorities': True, 'authentication_failure_close': True, 'per_consumer_qos': True, 'direct_reply_to': True}, 'cluster_name': '[email protected]', 'copyright': 'Copyright (c) 2007-2022 VMware, Inc. or its affiliates.', 'information': 'Licensed under the MPL 2.0. Website: https://rabbitmq.com', 'platform': 'Erlang/OTP 24.3.4.5', 'product': 'RabbitMQ', 'version': '3.10.7'}, mechanisms: [b'PLAIN', b'AMQPLAIN'], locales: ['en_US']
2022-09-22 23:52:02,821 AUDIT [-] Access granted to "sensors_container" with the token set to expire at "2022-09-23T23:52:02.818987Z". (username='sensors_container',token_expiration='2022-09-23T23:52:02.818987Z')
2022-09-22 23:52:02,822 WARNING [-] "auth.api_url" configuration option is not configured
2022-09-22 23:52:02,822 DEBUG [-] Running sensor subprocess (cmd="/opt/stackstorm/virtualenvs/rabbitmq/bin/python /opt/stackstorm/st2/lib/python3.8/site-packages/st2reactor/container/sensor_wrapper.py --pack=rabbitmq --file-path=/opt/stackstorm/packs/rabbitmq/sensors/queues_sensor.py --class-name=RabbitMQQueueSensor --trigger-type-refs=rabbitmq.new_message --parent-args=["--config-file=/etc/st2/st2.conf", "--debug", "--sensor-ref=rabbitmq.RabbitMQQueueSensor"]")
2022-09-22 23:52:02,827 DEBUG [-] Dispatching trigger (trigger=core.st2.sensor.process_spawn,payload={'trigger': 'core.st2.sensor.process_spawn', 'payload': {'id': 'RabbitMQQueueSensor', 'timestamp': 1663890722, 'pid': 39644, 'cmd': '/opt/stackstorm/virtualenvs/rabbitmq/bin/python /opt/stackstorm/st2/lib/python3.8/site-packages/st2reactor/container/sensor_wrapper.py --pack=rabbitmq --file-path=/opt/stackstorm/packs/rabbitmq/sensors/queues_sensor.py --class-name=RabbitMQQueueSensor --trigger-type-refs=rabbitmq.new_message --parent-args=["--config-file=/etc/st2/st2.conf", "--debug", "--sensor-ref=rabbitmq.RabbitMQQueueSensor"]'}, 'trace_context': None})
2022-09-22 23:52:02,842 INFO [-] Connected to amqp://stackstorm:**@127.0.0.1:5672//
2022-09-22 23:52:02,842 DEBUG [-] using channel_id: 1
2022-09-22 23:52:02,851 DEBUG [-] Channel open
2022-09-22 23:52:02,852 DEBUG [-] Start from server, version: 0.9, properties: {'capabilities': {'publisher_confirms': True, 'exchange_exchange_bindings': True, 'basic.nack': True, 'consumer_cancel_notify': True, 'connection.blocked': True, 'consumer_priorities': True, 'authentication_failure_close': True, 'per_consumer_qos': True, 'direct_reply_to': True}, 'cluster_name': '[email protected]', 'copyright': 'Copyright (c) 2007-2022 VMware, Inc. or its affiliates.', 'information': 'Licensed under the MPL 2.0. Website: https://rabbitmq.com', 'platform': 'Erlang/OTP 24.3.4.5', 'product': 'RabbitMQ', 'version': '3.10.7'}, mechanisms: [b'PLAIN', b'AMQPLAIN'], locales: ['en_US']
2022-09-22 23:52:02,853 DEBUG [-] using channel_id: 1
2022-09-22 23:52:02,855 DEBUG [-] Channel open
2022-09-22 23:52:02,857 DEBUG [-] Closed channel #1
2022-09-22 23:52:02,858 INFO [-] Sensor rabbitmq.RabbitMQQueueSensor started
2022-09-22 23:52:02,858 DEBUG [-] 1 active sensor(s)
2022-09-22 23:52:03,641 DEBUG [-] Using SSL context for RabbitMQ connection: {}
2022-09-22 23:52:03,648 INFO [-] Found config for sensor "RabbitMQQueueSensor"
2022-09-22 23:52:03,649 DEBUG [-] Using SSL context for RabbitMQ connection: {}
2022-09-22 23:52:03,649 INFO [-] Watcher started
2022-09-22 23:52:03,650 INFO [-] Running sensor initialization code
2022-09-22 23:52:03,650 DEBUG [-] Using PollPoller
2022-09-22 23:52:03,658 DEBUG [-] Created default connection workflow <pika.adapters.utils.connection_workflow.AMQPConnectionWorkflow object at 0x7fabdd9a4520>
2022-09-22 23:52:03,659 DEBUG [-] Starting AMQP Connection workflow asynchronously.
2022-09-22 23:52:03,659 DEBUG [-] call_later: added timeout <pika.adapters.select_connection._Timeout object at 0x7fabdd9a4550> with deadline=10300.86587002 and callback=functools.partial(<bound method AMQPConnectionWorkflow._start_new_cycle_async of <pika.adapters.utils.connection_workflow.AMQPConnectionWorkflow object at 0x7fabdd9a4520>>, first=True); now=10300.86587002; delay=0
2022-09-22 23:52:03,659 DEBUG [-] Beginning a new AMQP connection workflow cycle; attempts remaining after this: 0
2022-09-22 23:52:03,660 DEBUG [-] call_later: added timeout <pika.adapters.select_connection._Timeout object at 0x7fabdd9a4580> with deadline=10300.866654604 and callback=<bound method AMQPConnectionWorkflow._try_next_config_async of <pika.adapters.utils.connection_workflow.AMQPConnectionWorkflow object at 0x7fabdd9a4520>>; now=10300.866654604; delay=0
2022-09-22 23:52:03,660 DEBUG [-] _try_next_config_async: 'localhost':5672
2022-09-22 23:52:03,661 DEBUG [-] add_callback_threadsafe: added callback=<bound method _AddressResolver._dispatch_result of <pika.adapters.utils.selector_ioloop_adapter._AddressResolver object at 0x7fabdd9a45b0>>
2022-09-22 23:52:03,661 DEBUG [-] Start from server, version: 0.9, properties: {'capabilities': {'publisher_confirms': True, 'exchange_exchange_bindings': True, 'basic.nack': True, 'consumer_cancel_notify': True, 'connection.blocked': True, 'consumer_priorities': True, 'authentication_failure_close': True, 'per_consumer_qos': True, 'direct_reply_to': True}, 'cluster_name': '[email protected]', 'copyright': 'Copyright (c) 2007-2022 VMware, Inc. or its affiliates.', 'information': 'Licensed under the MPL 2.0. Website: https://rabbitmq.com', 'platform': 'Erlang/OTP 24.3.4.5', 'product': 'RabbitMQ', 'version': '3.10.7'}, mechanisms: [b'PLAIN', b'AMQPLAIN'], locales: ['en_US']
2022-09-22 23:52:03,663 INFO [-] Connected to amqp://stackstorm:**@127.0.0.1:5672//
2022-09-22 23:52:03,663 DEBUG [-] using channel_id: 1
2022-09-22 23:52:03,664 DEBUG [-] Channel open
2022-09-22 23:52:03,667 DEBUG [-] Found existing trigger: TriggerDB(description=None, id=632ccebfea32b3fad805a165, metadata_file=None, name="new_message", pack="rabbitmq", parameters={}, ref="rabbitmq.new_message", ref_count=0, type="rabbitmq.new_message", uid="trigger:rabbitmq:new_message:99914b932bd37a50b983c5e7c90ae93b") in db.
2022-09-22 23:52:03,668 DEBUG [-] Calling sensor "add_trigger" method (trigger.type=rabbitmq.new_message)
2022-09-22 23:52:03,668 DEBUG [-] process_timeouts: invoking callback=<bound method _AddressResolver._dispatch_result of <pika.adapters.utils.selector_ioloop_adapter._AddressResolver object at 0x7fabdd9a45b0>>
2022-09-22 23:52:03,668 DEBUG [-] Invoking asynchronous getaddrinfo() completion callback; host='localhost'
2022-09-22 23:52:03,669 DEBUG [-] getaddrinfo returned 1 records
2022-09-22 23:52:03,669 DEBUG [-] Attempting to connect using address record (<AddressFamily.AF_INET: 2>, <SocketKind.SOCK_STREAM: 1>, 6, '', ('127.0.0.1', 5672))
2022-09-22 23:52:03,669 INFO [-] Pika version 1.2.1 connecting to ('127.0.0.1', 5672)
2022-09-22 23:52:03,669 DEBUG [-] add_callback_threadsafe: added callback=<bound method _AsyncSocketConnector._start_async of <pika.adapters.utils.io_services_utils._AsyncSocketConnector object at 0x7fabdd9a43d0>>
2022-09-22 23:52:03,670 DEBUG [-] call_later: added timeout <pika.adapters.select_connection._Timeout object at 0x7fabdd988e50> with deadline=10310.876740134 and callback=<bound method AMQPConnector._on_tcp_connection_timeout of <pika.adapters.utils.connection_workflow.AMQPConnector object at 0x7fabdd9a4040>>; now=10300.876740134; delay=10.0
2022-09-22 23:52:03,670 DEBUG [-] call_later: added timeout <pika.adapters.select_connection._Timeout object at 0x7fabdd988d00> with deadline=10315.877011888 and callback=<bound method AMQPConnector._on_overall_timeout of <pika.adapters.utils.connection_workflow.AMQPConnector object at 0x7fabdd9a4040>>; now=10300.877011888; delay=15.0
2022-09-22 23:52:03,670 DEBUG [-] process_timeouts: invoking callback=<bound method _AsyncSocketConnector._start_async of <pika.adapters.utils.io_services_utils._AsyncSocketConnector object at 0x7fabdd9a43d0>>
2022-09-22 23:52:03,671 DEBUG [-] SelectorIOServicesAdapter.set_writer(9, <bound method _AsyncSocketConnector._on_writable of <pika.adapters.utils.io_services_utils._AsyncSocketConnector object at 0x7fabdd9a43d0>>)
2022-09-22 23:52:03,671 DEBUG [-] set_writer(9, _) added handler Wr
2022-09-22 23:52:03,671 DEBUG [-] Connection-establishment is in progress for <eventlet.greenio.base.GreenSocket object at 0x7fabdd9a40d0>.
2022-09-22 23:52:03,672 INFO [-] Socket connected: <eventlet.greenio.base.GreenSocket object at 0x7fabdd9a40d0>
2022-09-22 23:52:03,672 DEBUG [-] _AsyncSocketConnector._report_completion(None); <eventlet.greenio.base.GreenSocket object at 0x7fabdd9a40d0>
2022-09-22 23:52:03,672 DEBUG [-] SelectorIOServicesAdapter.remove_writer(9)
2022-09-22 23:52:03,672 DEBUG [-] remove_writer(9) removed handler
2022-09-22 23:52:03,673 DEBUG [-] remove_timeout: removing timeout <pika.adapters.select_connection._Timeout object at 0x7fabdd988e50> with deadline=10310.876740134 and callback=<bound method AMQPConnector._on_tcp_connection_timeout of <pika.adapters.utils.connection_workflow.AMQPConnector object at 0x7fabdd9a4040>>
2022-09-22 23:52:03,673 DEBUG [-] TCP connection to broker established: <eventlet.greenio.base.GreenSocket object at 0x7fabdd9a40d0>.
2022-09-22 23:52:03,673 DEBUG [-] _AsyncStreamConnector.start(); <eventlet.greenio.base.GreenSocket object at 0x7fabdd9a40d0>
2022-09-22 23:52:03,673 DEBUG [-] add_callback_threadsafe: added callback=<bound method _AsyncStreamConnector._start_async of <pika.adapters.utils.io_services_utils._AsyncStreamConnector object at 0x7fabdd9a40a0>>
2022-09-22 23:52:03,674 DEBUG [-] process_timeouts: invoking callback=<bound method _AsyncStreamConnector._start_async of <pika.adapters.utils.io_services_utils._AsyncStreamConnector object at 0x7fabdd9a40a0>>
2022-09-22 23:52:03,674 DEBUG [-] _AsyncStreamConnector._start_async(); <eventlet.greenio.base.GreenSocket object at 0x7fabdd9a40d0>
2022-09-22 23:52:03,674 DEBUG [-] _AsyncStreamConnector._linkup()
2022-09-22 23:52:03,674 DEBUG [-] New Connection state: CLOSED (prev=CLOSED)
2022-09-22 23:52:03,675 DEBUG [-] Added: {'callback': <bound method Connection._on_connection_start of <SelectConnection CLOSED transport=None params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>>, 'one_shot': True, 'only': None, 'arguments': None, 'calls': 1}
2022-09-22 23:52:03,675 DEBUG [-] Added: {'callback': <bound method Connection._on_connection_close_from_broker of <SelectConnection CLOSED transport=None params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>>, 'one_shot': True, 'only': None, 'arguments': None, 'calls': 1}
2022-09-22 23:52:03,675 DEBUG [-] Added: {'callback': <bound method Connection._default_on_connection_error of <SelectConnection CLOSED transport=None params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>>, 'one_shot': False, 'only': None, 'arguments': None}
2022-09-22 23:52:03,675 DEBUG [-] New Connection state: INIT (prev=CLOSED)
2022-09-22 23:52:03,676 DEBUG [-] Using external connection workflow.
2022-09-22 23:52:03,676 DEBUG [-] _AsyncTransportBase.__init__: <eventlet.greenio.base.GreenSocket object at 0x7fabdd9a40d0>
2022-09-22 23:52:03,676 DEBUG [-] SelectorIOServicesAdapter.set_reader(9, <bound method _AsyncPlaintextTransport._on_socket_readable of <pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fabdd988ee0>>)
2022-09-22 23:52:03,676 DEBUG [-] set_reader(9, _) added handler Rd
2022-09-22 23:52:03,677 DEBUG [-] _linkup(): created transport <pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fabdd988ee0>
2022-09-22 23:52:03,677 DEBUG [-] New Connection state: PROTOCOL (prev=INIT)
2022-09-22 23:52:03,677 DEBUG [-] SelectorIOServicesAdapter.set_writer(9, <bound method _AsyncPlaintextTransport._on_socket_writable of <pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fabdd988ee0>>)
2022-09-22 23:52:03,677 DEBUG [-] set_writer(9, _) updated handler RdWr
2022-09-22 23:52:03,678 DEBUG [-] Turned on writability watcher: <eventlet.greenio.base.GreenSocket object at 0x7fabdd9a40d0>
2022-09-22 23:52:03,678 DEBUG [-] _linkup(): introduced transport to protocol <pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fabdd988ee0>; _StreamingProtocolShim: <SelectConnection PROTOCOL transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fabdd988ee0> params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>
2022-09-22 23:52:03,678 DEBUG [-] _AsyncStreamConnector._report_completion((<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fabdd988ee0>, _StreamingProtocolShim: <SelectConnection PROTOCOL transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fabdd988ee0> params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>)); <eventlet.greenio.base.GreenSocket object at 0x7fabdd9a40d0>
2022-09-22 23:52:03,678 INFO [-] Streaming transport linked up: (<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fabdd988ee0>, _StreamingProtocolShim: <SelectConnection PROTOCOL transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fabdd988ee0> params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>).
2022-09-22 23:52:03,679 DEBUG [-] Removing callback #0: {'callback': <bound method Connection._default_on_connection_error of <SelectConnection PROTOCOL transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fabdd988ee0> params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>>, 'one_shot': False, 'only': None, 'arguments': None}
2022-09-22 23:52:03,679 DEBUG [-] Added: {'callback': <bound method AMQPConnector._on_amqp_handshake_done of <pika.adapters.utils.connection_workflow.AMQPConnector object at 0x7fabdd9a4040>>, 'one_shot': False, 'only': None, 'arguments': None}
2022-09-22 23:52:03,679 DEBUG [-] Added: {'callback': <bound method AMQPConnector._on_amqp_handshake_done of <pika.adapters.utils.connection_workflow.AMQPConnector object at 0x7fabdd9a4040>>, 'one_shot': False, 'only': None, 'arguments': None}
2022-09-22 23:52:03,679 DEBUG [-] _AsyncStreamConnector._cleanup(False)
2022-09-22 23:52:03,680 DEBUG [-] SelectorIOServicesAdapter.remove_writer(9)
2022-09-22 23:52:03,680 DEBUG [-] remove_writer(9) updated handler Rd
2022-09-22 23:52:03,681 DEBUG [-] Turned off writability watcher: <eventlet.greenio.base.GreenSocket object at 0x7fabdd9a40d0>
2022-09-22 23:52:03,681 DEBUG [-] Processing 0:Connection.Start
2022-09-22 23:52:03,682 DEBUG [-] Processing use of oneshot callback
2022-09-22 23:52:03,682 DEBUG [-] 0 registered uses left
2022-09-22 23:52:03,682 DEBUG [-] Removing callback #0: {'callback': <bound method Connection._on_connection_start of <SelectConnection PROTOCOL transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fabdd988ee0> params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>>, 'one_shot': True, 'only': None, 'arguments': None, 'calls': 0}
2022-09-22 23:52:03,682 DEBUG [-] Calling <bound method Connection._on_connection_start of <SelectConnection PROTOCOL transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fabdd988ee0> params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>> for "0:Connection.Start"
2022-09-22 23:52:03,682 DEBUG [-] New Connection state: START (prev=PROTOCOL)
2022-09-22 23:52:03,683 DEBUG [-] Added: {'callback': <bound method Connection._on_connection_tune of <SelectConnection START transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fabdd988ee0> params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>>, 'one_shot': True, 'only': None, 'arguments': None, 'calls': 1}
2022-09-22 23:52:03,683 DEBUG [-] SelectorIOServicesAdapter.set_writer(9, <bound method _AsyncPlaintextTransport._on_socket_writable of <pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fabdd988ee0>>)
2022-09-22 23:52:03,683 DEBUG [-] set_writer(9, _) updated handler RdWr
2022-09-22 23:52:03,684 DEBUG [-] Turned on writability watcher: <eventlet.greenio.base.GreenSocket object at 0x7fabdd9a40d0>
2022-09-22 23:52:03,684 DEBUG [-] Recv would block on <eventlet.greenio.base.GreenSocket object at 0x7fabdd9a40d0>
2022-09-22 23:52:03,684 DEBUG [-] SelectorIOServicesAdapter.remove_writer(9)
2022-09-22 23:52:03,685 DEBUG [-] remove_writer(9) updated handler Rd
2022-09-22 23:52:03,685 DEBUG [-] Turned off writability watcher: <eventlet.greenio.base.GreenSocket object at 0x7fabdd9a40d0>
2022-09-22 23:52:03,685 DEBUG [-] Processing 0:Connection.Tune
2022-09-22 23:52:03,685 DEBUG [-] Processing use of oneshot callback
2022-09-22 23:52:03,685 DEBUG [-] 0 registered uses left
2022-09-22 23:52:03,686 DEBUG [-] Removing callback #0: {'callback': <bound method Connection._on_connection_tune of <SelectConnection START transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fabdd988ee0> params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>>, 'one_shot': True, 'only': None, 'arguments': None, 'calls': 0}
2022-09-22 23:52:03,686 DEBUG [-] Calling <bound method Connection._on_connection_tune of <SelectConnection START transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fabdd988ee0> params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>> for "0:Connection.Tune"
2022-09-22 23:52:03,686 DEBUG [-] New Connection state: TUNE (prev=START)
2022-09-22 23:52:03,686 DEBUG [-] Creating a HeartbeatChecker: 60
2022-09-22 23:52:03,687 DEBUG [-] timeout: 60.000000 send_interval: 30.000000 check_interval: 65.000000
2022-09-22 23:52:03,687 DEBUG [-] call_later: added timeout <pika.adapters.select_connection._Timeout object at 0x7fabdd9a4970> with deadline=10330.893890931 and callback=<bound method HeartbeatChecker._send_heartbeat of <pika.heartbeat.HeartbeatChecker object at 0x7fabdd9a48e0>>; now=10300.893890931; delay=30.0
2022-09-22 23:52:03,687 DEBUG [-] call_later: added timeout <pika.adapters.select_connection._Timeout object at 0x7fabdd9a47c0> with deadline=10365.894188715 and callback=<bound method HeartbeatChecker._check_heartbeat of <pika.heartbeat.HeartbeatChecker object at 0x7fabdd9a48e0>>; now=10300.894188715; delay=65
2022-09-22 23:52:03,687 DEBUG [-] SelectorIOServicesAdapter.set_writer(9, <bound method _AsyncPlaintextTransport._on_socket_writable of <pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fabdd988ee0>>)
2022-09-22 23:52:03,688 DEBUG [-] set_writer(9, _) updated handler RdWr
2022-09-22 23:52:03,688 DEBUG [-] Turned on writability watcher: <eventlet.greenio.base.GreenSocket object at 0x7fabdd9a40d0>
2022-09-22 23:52:03,688 DEBUG [-] Added: {'callback': <bound method Connection._on_connection_open_ok of <SelectConnection TUNE transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fabdd988ee0> params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>>, 'one_shot': True, 'only': None, 'arguments': None, 'calls': 1}
2022-09-22 23:52:03,688 DEBUG [-] Recv would block on <eventlet.greenio.base.GreenSocket object at 0x7fabdd9a40d0>
2022-09-22 23:52:03,690 DEBUG [-] SelectorIOServicesAdapter.remove_writer(9)
2022-09-22 23:52:03,690 DEBUG [-] remove_writer(9) updated handler Rd
2022-09-22 23:52:03,690 DEBUG [-] Turned off writability watcher: <eventlet.greenio.base.GreenSocket object at 0x7fabdd9a40d0>
2022-09-22 23:52:03,691 DEBUG [-] Processing 0:Connection.OpenOk
2022-09-22 23:52:03,691 DEBUG [-] Processing use of oneshot callback
2022-09-22 23:52:03,691 DEBUG [-] 0 registered uses left
2022-09-22 23:52:03,691 DEBUG [-] Removing callback #0: {'callback': <bound method Connection._on_connection_open_ok of <SelectConnection TUNE transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fabdd988ee0> params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>>, 'one_shot': True, 'only': None, 'arguments': None, 'calls': 0}
2022-09-22 23:52:03,691 DEBUG [-] Calling <bound method Connection._on_connection_open_ok of <SelectConnection TUNE transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fabdd988ee0> params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>> for "0:Connection.OpenOk"
2022-09-22 23:52:03,692 DEBUG [-] New Connection state: OPEN (prev=TUNE)
2022-09-22 23:52:03,692 DEBUG [-] Processing 0:_on_connection_open_ok
2022-09-22 23:52:03,692 DEBUG [-] Calling <bound method AMQPConnector._on_amqp_handshake_done of <pika.adapters.utils.connection_workflow.AMQPConnector object at 0x7fabdd9a4040>> for "0:_on_connection_open_ok"
2022-09-22 23:52:03,692 DEBUG [-] AMQPConnector: AMQP handshake attempt completed; state=3; error=None; 'localhost'/(<AddressFamily.AF_INET: 2>, <SocketKind.SOCK_STREAM: 1>, 6, '', ('127.0.0.1', 5672))
2022-09-22 23:52:03,692 DEBUG [-] AMQPConnector: AMQP connection established for 'localhost'/(<AddressFamily.AF_INET: 2>, <SocketKind.SOCK_STREAM: 1>, 6, '', ('127.0.0.1', 5672)): <SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fabdd988ee0> params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>
2022-09-22 23:52:03,693 INFO [-] AMQPConnector - reporting success: <SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fabdd988ee0> params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>
2022-09-22 23:52:03,693 DEBUG [-] remove_timeout: removing timeout <pika.adapters.select_connection._Timeout object at 0x7fabdd988d00> with deadline=10315.877011888 and callback=<bound method AMQPConnector._on_overall_timeout of <pika.adapters.utils.connection_workflow.AMQPConnector object at 0x7fabdd9a4040>>
2022-09-22 23:52:03,693 DEBUG [-] Connection attempt completed with <SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fabdd988ee0> params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>
2022-09-22 23:52:03,694 INFO [-] AMQPConnectionWorkflow - reporting success: <SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fabdd988ee0> params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>
2022-09-22 23:52:03,694 DEBUG [-] Recv would block on <eventlet.greenio.base.GreenSocket object at 0x7fabdd9a40d0>
2022-09-22 23:52:03,694 INFO [-] Connection workflow succeeded: <SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fabdd988ee0> params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>
2022-09-22 23:52:03,694 DEBUG [-] Added: {'callback': <bound method _CallbackResult.set_value_once of <pika.adapters.blocking_connection._CallbackResult object at 0x7fabddb0f440>>, 'one_shot': False, 'only': None, 'arguments': None}
2022-09-22 23:52:03,695 DEBUG [-] Creating channel 1
2022-09-22 23:52:03,695 DEBUG [-] Added: {'callback': <bound method Connection._on_channel_cleanup of <SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fabdd988ee0> params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>>, 'one_shot': True, 'only': <Channel number=1 CLOSED conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fabdd988ee0> params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>>, 'arguments': None, 'calls': 1}
2022-09-22 23:52:03,695 DEBUG [-] Added: {'callback': <bound method Channel._on_getempty of <Channel number=1 OPENING conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fabdd988ee0> params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>>>, 'one_shot': False, 'only': None, 'arguments': None}
2022-09-22 23:52:03,695 DEBUG [-] Added: {'callback': <bound method Channel._on_cancel of <Channel number=1 OPENING conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fabdd988ee0> params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>>>, 'one_shot': False, 'only': None, 'arguments': None}
2022-09-22 23:52:03,696 DEBUG [-] Added: {'callback': <bound method Channel._on_flow of <Channel number=1 OPENING conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fabdd988ee0> params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>>>, 'one_shot': False, 'only': None, 'arguments': None}
2022-09-22 23:52:03,696 DEBUG [-] Added: {'callback': <bound method Channel._on_close_from_broker of <Channel number=1 OPENING conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fabdd988ee0> params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>>>, 'one_shot': True, 'only': None, 'arguments': None, 'calls': 1}
2022-09-22 23:52:03,696 DEBUG [-] SelectorIOServicesAdapter.set_writer(9, <bound method _AsyncPlaintextTransport._on_socket_writable of <pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fabdd988ee0>>)
2022-09-22 23:52:03,696 DEBUG [-] set_writer(9, _) updated handler RdWr
2022-09-22 23:52:03,697 DEBUG [-] Turned on writability watcher: <eventlet.greenio.base.GreenSocket object at 0x7fabdd9a40d0>
2022-09-22 23:52:03,697 DEBUG [-] Entering blocking state on frame <Channel.Open(['out_of_band='])>; acceptable_replies=[<class 'pika.spec.Channel.OpenOk'>]
2022-09-22 23:52:03,697 DEBUG [-] Adding on_synchronous_complete callback
2022-09-22 23:52:03,697 DEBUG [-] Added: {'callback': <bound method Channel._on_synchronous_complete of <Channel number=1 OPENING conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fabdd988ee0> params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>>>, 'one_shot': True, 'only': None, 'arguments': None, 'calls': 1}
2022-09-22 23:52:03,698 DEBUG [-] Adding passed-in RPC response callback
2022-09-22 23:52:03,698 DEBUG [-] Added: {'callback': <bound method Channel._on_openok of <Channel number=1 OPENING conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fabdd988ee0> params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>>>, 'one_shot': True, 'only': None, 'arguments': None, 'calls': 1}
2022-09-22 23:52:03,698 DEBUG [-] Added: {'callback': <bound method BlockingChannel._on_consumer_cancelled_by_broker of <BlockingChannel impl=<Channel number=1 OPENING conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fabdd988ee0> params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>>>>, 'one_shot': False, 'only': None, 'arguments': None}
2022-09-22 23:52:03,699 DEBUG [-] Added: {'callback': <bound method _CallbackResult.signal_once of <pika.adapters.blocking_connection._CallbackResult object at 0x7fabdd9525c0>>, 'one_shot': False, 'only': None, 'arguments': None}
2022-09-22 23:52:03,699 DEBUG [-] Added: {'callback': <bound method BlockingChannel._on_channel_closed of <BlockingChannel impl=<Channel number=1 OPENING conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fabdd988ee0> params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>>>>, 'one_shot': False, 'only': <Channel number=1 OPENING conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fabdd988ee0> params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>>, 'arguments': None}
2022-09-22 23:52:03,699 DEBUG [-] Added: {'callback': <bound method _CallbackResult.set_value_once of <pika.adapters.blocking_connection._CallbackResult object at 0x7fabdd9520c0>>, 'one_shot': False, 'only': None, 'arguments': None}
2022-09-22 23:52:03,699 INFO [-] Created channel=1
2022-09-22 23:52:03,700 DEBUG [-] SelectorIOServicesAdapter.remove_writer(9)
2022-09-22 23:52:03,700 DEBUG [-] remove_writer(9) updated handler Rd
2022-09-22 23:52:03,701 DEBUG [-] Turned off writability watcher: <eventlet.greenio.base.GreenSocket object at 0x7fabdd9a40d0>
2022-09-22 23:52:03,701 DEBUG [-] Processing 1:Channel.OpenOk
2022-09-22 23:52:03,701 DEBUG [-] Processing use of oneshot callback
2022-09-22 23:52:03,702 DEBUG [-] 0 registered uses left
2022-09-22 23:52:03,702 DEBUG [-] Removing callback #0: {'callback': <bound method Channel._on_synchronous_complete of <Channel number=1 OPENING conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fabdd988ee0> params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>>>, 'one_shot': True, 'only': None, 'arguments': None, 'calls': 0}
2022-09-22 23:52:03,702 DEBUG [-] Processing use of oneshot callback
2022-09-22 23:52:03,702 DEBUG [-] 0 registered uses left
2022-09-22 23:52:03,703 DEBUG [-] Removing callback #0: {'callback': <bound method Channel._on_openok of <Channel number=1 OPENING conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fabdd988ee0> params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>>>, 'one_shot': True, 'only': None, 'arguments': None, 'calls': 0}
2022-09-22 23:52:03,703 DEBUG [-] Calling <bound method Channel._on_synchronous_complete of <Channel number=1 OPENING conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fabdd988ee0> params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>>> for "1:Channel.OpenOk"
2022-09-22 23:52:03,703 DEBUG [-] 0 blocked frames
2022-09-22 23:52:03,703 DEBUG [-] Calling <bound method Channel._on_openok of <Channel number=1 OPENING conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fabdd988ee0> params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>>> for "1:Channel.OpenOk"
2022-09-22 23:52:03,703 DEBUG [-] Recv would block on <eventlet.greenio.base.GreenSocket object at 0x7fabdd9a40d0>
2022-09-22 23:52:03,704 DEBUG [-] SelectorIOServicesAdapter.set_writer(9, <bound method _AsyncPlaintextTransport._on_socket_writable of <pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fabdd988ee0>>)
2022-09-22 23:52:03,704 DEBUG [-] set_writer(9, _) updated handler RdWr
2022-09-22 23:52:03,704 DEBUG [-] Turned on writability watcher: <eventlet.greenio.base.GreenSocket object at 0x7fabdd9a40d0>
2022-09-22 23:52:03,704 DEBUG [-] Entering blocking state on frame <Basic.Qos(['global_qos=False', 'prefetch_count=1', 'prefetch_size=0'])>; acceptable_replies=[<class 'pika.spec.Basic.QosOk'>]
2022-09-22 23:52:03,705 DEBUG [-] Adding on_synchronous_complete callback
2022-09-22 23:52:03,705 DEBUG [-] Added: {'callback': <bound method Channel._on_synchronous_complete of <Channel number=1 OPEN conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fabdd988ee0> params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>>>, 'one_shot': True, 'only': None, 'arguments': None, 'calls': 1}
2022-09-22 23:52:03,705 DEBUG [-] Adding passed-in RPC response callback
2022-09-22 23:52:03,705 DEBUG [-] Added: {'callback': <bound method _CallbackResult.signal_once of <pika.adapters.blocking_connection._CallbackResult object at 0x7fabdd9acb40>>, 'one_shot': True, 'only': None, 'arguments': None, 'calls': 1}
2022-09-22 23:52:03,706 DEBUG [-] SelectorIOServicesAdapter.remove_writer(9)
2022-09-22 23:52:03,706 DEBUG [-] remove_writer(9) updated handler Rd
2022-09-22 23:52:03,706 DEBUG [-] Turned off writability watcher: <eventlet.greenio.base.GreenSocket object at 0x7fabdd9a40d0>
2022-09-22 23:52:03,706 DEBUG [-] Processing 1:Basic.QosOk
2022-09-22 23:52:03,707 DEBUG [-] Processing use of oneshot callback
2022-09-22 23:52:03,707 DEBUG [-] 0 registered uses left
2022-09-22 23:52:03,707 DEBUG [-] Removing callback #0: {'callback': <bound method Channel._on_synchronous_complete of <Channel number=1 OPEN conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fabdd988ee0> params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>>>, 'one_shot': True, 'only': None, 'arguments': None, 'calls': 0}
2022-09-22 23:52:03,707 DEBUG [-] Processing use of oneshot callback
2022-09-22 23:52:03,708 DEBUG [-] 0 registered uses left
2022-09-22 23:52:03,708 DEBUG [-] Removing callback #0: {'callback': <bound method _CallbackResult.signal_once of <pika.adapters.blocking_connection._CallbackResult object at 0x7fabdd9acb40>>, 'one_shot': True, 'only': None, 'arguments': None, 'calls': 0}
2022-09-22 23:52:03,708 DEBUG [-] Calling <bound method Channel._on_synchronous_complete of <Channel number=1 OPEN conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fabdd988ee0> params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>>> for "1:Basic.QosOk"
2022-09-22 23:52:03,708 DEBUG [-] 0 blocked frames
2022-09-22 23:52:03,708 DEBUG [-] Calling <bound method _CallbackResult.signal_once of <pika.adapters.blocking_connection._CallbackResult object at 0x7fabdd9acb40>> for "1:Basic.QosOk"
2022-09-22 23:52:03,709 DEBUG [-] Recv would block on <eventlet.greenio.base.GreenSocket object at 0x7fabdd9a40d0>
2022-09-22 23:52:03,709 DEBUG [-] SelectorIOServicesAdapter.set_writer(9, <bound method _AsyncPlaintextTransport._on_socket_writable of <pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fabdd988ee0>>)
2022-09-22 23:52:03,709 DEBUG [-] set_writer(9, _) updated handler RdWr
2022-09-22 23:52:03,709 DEBUG [-] Turned on writability watcher: <eventlet.greenio.base.GreenSocket object at 0x7fabdd9a40d0>
2022-09-22 23:52:03,710 DEBUG [-] Entering blocking state on frame <Queue.Declare(["arguments={'x-queue-type': 'quorum'}", 'auto_delete=False', 'durable=True', 'exclusive=False', 'nowait=False', 'passive=False', 'queue=platform_st2_jobs', 'ticket=0'])>; acceptable_replies=[(<class 'pika.spec.Queue.DeclareOk'>, {'queue': 'platform_st2_jobs'})]
2022-09-22 23:52:03,710 DEBUG [-] Adding on_synchronous_complete callback
2022-09-22 23:52:03,710 DEBUG [-] Added: {'callback': <bound method Channel._on_synchronous_complete of <Channel number=1 OPEN conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fabdd988ee0> params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>>>, 'one_shot': True, 'only': None, 'arguments': {'queue': 'platform_st2_jobs'}, 'calls': 1}
2022-09-22 23:52:03,710 DEBUG [-] Adding passed-in RPC response callback
2022-09-22 23:52:03,711 DEBUG [-] Added: {'callback': <bound method _CallbackResult.set_value_once of <pika.adapters.blocking_connection._CallbackResult object at 0x7fabdd9acb40>>, 'one_shot': True, 'only': None, 'arguments': {'queue': 'platform_st2_jobs'}, 'calls': 1}
2022-09-22 23:52:03,711 DEBUG [-] SelectorIOServicesAdapter.remove_writer(9)
2022-09-22 23:52:03,711 DEBUG [-] remove_writer(9) updated handler Rd
2022-09-22 23:52:03,712 DEBUG [-] Turned off writability watcher: <eventlet.greenio.base.GreenSocket object at 0x7fabdd9a40d0>
2022-09-22 23:52:03,712 DEBUG [-] Processing 1:Queue.DeclareOk
2022-09-22 23:52:03,712 DEBUG [-] Processing use of oneshot callback
2022-09-22 23:52:03,712 DEBUG [-] 0 registered uses left
2022-09-22 23:52:03,713 DEBUG [-] Comparing {'queue': 'platform_st2_jobs'} to {'queue': 'platform_st2_jobs'}
2022-09-22 23:52:03,713 DEBUG [-] Removing callback #0: {'callback': <bound method Channel._on_synchronous_complete of <Channel number=1 OPEN conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fabdd988ee0> params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>>>, 'one_shot': True, 'only': None, 'arguments': {'queue': 'platform_st2_jobs'}, 'calls': 0}
2022-09-22 23:52:03,713 DEBUG [-] Processing use of oneshot callback
2022-09-22 23:52:03,713 DEBUG [-] 0 registered uses left
2022-09-22 23:52:03,713 DEBUG [-] Comparing {'queue': 'platform_st2_jobs'} to {'queue': 'platform_st2_jobs'}
2022-09-22 23:52:03,714 DEBUG [-] Removing callback #0: {'callback': <bound method _CallbackResult.set_value_once of <pika.adapters.blocking_connection._CallbackResult object at 0x7fabdd9acb40>>, 'one_shot': True, 'only': None, 'arguments': {'queue': 'platform_st2_jobs'}, 'calls': 0}
2022-09-22 23:52:03,714 DEBUG [-] Calling <bound method Channel._on_synchronous_complete of <Channel number=1 OPEN conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fabdd988ee0> params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>>> for "1:Queue.DeclareOk"
2022-09-22 23:52:03,714 DEBUG [-] 0 blocked frames
2022-09-22 23:52:03,714 DEBUG [-] Calling <bound method _CallbackResult.set_value_once of <pika.adapters.blocking_connection._CallbackResult object at 0x7fabdd9acb40>> for "1:Queue.DeclareOk"
2022-09-22 23:52:03,715 DEBUG [-] Recv would block on <eventlet.greenio.base.GreenSocket object at 0x7fabdd9a40d0>
2022-09-22 23:52:03,715 DEBUG [-] SelectorIOServicesAdapter.set_writer(9, <bound method _AsyncPlaintextTransport._on_socket_writable of <pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fabdd988ee0>>)
2022-09-22 23:52:03,715 DEBUG [-] set_writer(9, _) updated handler RdWr
2022-09-22 23:52:03,715 DEBUG [-] Turned on writability watcher: <eventlet.greenio.base.GreenSocket object at 0x7fabdd9a40d0>
2022-09-22 23:52:03,716 DEBUG [-] Entering blocking state on frame <Basic.Consume(['arguments={}', 'consumer_tag=ctag1.377f0140f32f415198af3f0e779b023d', 'exclusive=False', 'no_ack=False', 'no_local=False', 'nowait=False', 'queue=platform_st2_jobs', 'ticket=0'])>; acceptable_replies=[(<class 'pika.spec.Basic.ConsumeOk'>, {'consumer_tag': 'ctag1.377f0140f32f415198af3f0e779b023d'})]
2022-09-22 23:52:03,716 DEBUG [-] Adding on_synchronous_complete callback
2022-09-22 23:52:03,716 DEBUG [-] Added: {'callback': <bound method Channel._on_synchronous_complete of <Channel number=1 OPEN conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fabdd988ee0> params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>>>, 'one_shot': True, 'only': None, 'arguments': {'consumer_tag': 'ctag1.377f0140f32f415198af3f0e779b023d'}, 'calls': 1}
2022-09-22 23:52:03,716 DEBUG [-] Adding passed-in RPC response callback
2022-09-22 23:52:03,716 DEBUG [-] Added: {'callback': <bound method Channel._on_eventok of <Channel number=1 OPEN conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fabdd988ee0> params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>>>, 'one_shot': True, 'only': None, 'arguments': {'consumer_tag': 'ctag1.377f0140f32f415198af3f0e779b023d'}, 'calls': 1}
2022-09-22 23:52:03,717 DEBUG [-] SelectorIOServicesAdapter.remove_writer(9)
2022-09-22 23:52:03,717 DEBUG [-] remove_writer(9) updated handler Rd
2022-09-22 23:52:03,718 DEBUG [-] Turned off writability watcher: <eventlet.greenio.base.GreenSocket object at 0x7fabdd9a40d0>
2022-09-22 23:52:03,720 DEBUG [-] Processing 1:Basic.ConsumeOk
2022-09-22 23:52:03,720 DEBUG [-] Processing use of oneshot callback
2022-09-22 23:52:03,720 DEBUG [-] 0 registered uses left
2022-09-22 23:52:03,721 DEBUG [-] Comparing {'consumer_tag': 'ctag1.377f0140f32f415198af3f0e779b023d'} to {'consumer_tag': 'ctag1.377f0140f32f415198af3f0e779b023d'}
2022-09-22 23:52:03,721 DEBUG [-] Removing callback #1: {'callback': <bound method Channel._on_synchronous_complete of <Channel number=1 OPEN conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fabdd988ee0> params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>>>, 'one_shot': True, 'only': None, 'arguments': {'consumer_tag': 'ctag1.377f0140f32f415198af3f0e779b023d'}, 'calls': 0}
2022-09-22 23:52:03,721 DEBUG [-] Processing use of oneshot callback
2022-09-22 23:52:03,721 DEBUG [-] 0 registered uses left
2022-09-22 23:52:03,722 DEBUG [-] Comparing {'consumer_tag': 'ctag1.377f0140f32f415198af3f0e779b023d'} to {'consumer_tag': 'ctag1.377f0140f32f415198af3f0e779b023d'}
2022-09-22 23:52:03,722 DEBUG [-] Removing callback #1: {'callback': <bound method Channel._on_eventok of <Channel number=1 OPEN conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fabdd988ee0> params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>>>, 'one_shot': True, 'only': None, 'arguments': {'consumer_tag': 'ctag1.377f0140f32f415198af3f0e779b023d'}, 'calls': 0}
2022-09-22 23:52:03,722 DEBUG [-] Calling <bound method _CallbackResult.signal_once of <pika.adapters.blocking_connection._CallbackResult object at 0x7fabdd9525c0>> for "1:Basic.ConsumeOk"
2022-09-22 23:52:03,724 DEBUG [-] Calling <bound method Channel._on_synchronous_complete of <Channel number=1 OPEN conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fabdd988ee0> params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>>> for "1:Basic.ConsumeOk"
2022-09-22 23:52:03,724 DEBUG [-] 0 blocked frames
2022-09-22 23:52:03,725 DEBUG [-] Calling <bound method Channel._on_eventok of <Channel number=1 OPEN conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fabdd988ee0> params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>>> for "1:Basic.ConsumeOk"
2022-09-22 23:52:03,725 DEBUG [-] Discarding frame <METHOD(['channel_number=1', 'frame_type=1', "method=<Basic.ConsumeOk(['consumer_tag=ctag1.377f0140f32f415198af3f0e779b023d'])>"])>
2022-09-22 23:52:03,725 DEBUG [-] Recv would block on <eventlet.greenio.base.GreenSocket object at 0x7fabdd9a40d0>
2022-09-22 23:52:03,725 INFO [-] Running sensor in passive mode
2022-09-22 23:52:03,726 INFO [-] Starting to consume messages from RabbitMQ for ['platform_st2_jobs']
2022-09-22 23:52:07,859 DEBUG [-] 1 active sensor(s)
2022-09-22 23:52:12,861 DEBUG [-] 1 active sensor(s)
2022-09-22 23:52:17,862 DEBUG [-] 1 active sensor(s)
^C2022-09-22 23:52:18,513 ERROR [-] Traceback (most recent call last):
2022-09-22 23:52:18,513 INFO [-] Container shutting down. Invoking cleanup on sensors.
2022-09-22 23:52:19,514 INFO [-] All sensors are shut down.
2022-09-22 23:52:19,514 DEBUG [-] Shutting down sensor watcher.
2022-09-22 23:52:19,521 DEBUG [-] Closed channel #1
2022-09-22 23:52:19,522 DEBUG [-] Start from server, version: 0.9, properties: {'capabilities': {'publisher_confirms': True, 'exchange_exchange_bindings': True, 'basic.nack': True, 'consumer_cancel_notify': True, 'connection.blocked': True, 'consumer_priorities': True, 'authentication_failure_close': True, 'per_consumer_qos': True, 'direct_reply_to': True}, 'cluster_name': '[email protected]', 'copyright': 'Copyright (c) 2007-2022 VMware, Inc. or its affiliates.', 'information': 'Licensed under the MPL 2.0. Website: https://rabbitmq.com', 'platform': 'Erlang/OTP 24.3.4.5', 'product': 'RabbitMQ', 'version': '3.10.7'}, mechanisms: [b'PLAIN', b'AMQPLAIN'], locales: ['en_US']
2022-09-22 23:52:19,524 DEBUG [-] using channel_id: 1
2022-09-22 23:52:19,524 DEBUG [-] Channel open
2022-09-22 23:52:19,530 DEBUG [-] Closed channel #1
2022-09-22 23:52:19,530 INFO [-] (PID:39632) SensorContainer stopped. Reason - KeyboardInterrupt |
Beta Was this translation helpful? Give feedback.
Replies: 9 comments 9 replies
-
Would it be possible to get some insight from the development team please? TIA! |
Beta Was this translation helpful? Give feedback.
-
I'm going to start logging my troubleshooting findings to either act as data collection, or to serve as a log to resolution. Troubleshooting Update: st2sensorcontainer-filewatch:
image: ${ST2_IMAGE_REPO:-stackstorm/}st2sensorcontainer:${ST2_VERSION:-latest}
restart: on-failure
depends_on:
- st2api
networks:
- private
command:
- "/opt/stackstorm/st2/bin/st2sensorcontainer"
- "--config-file=/etc/st2/st2.conf"
- "--config-file=/etc/st2/st2.docker.conf"
- "--config-file=/etc/st2/st2.user.conf"
- "--single-sensor-mode"
- "--sensor-ref"
- "linux.FileWatchSensor"
volumes:
- ./files/st2.docker.conf:/etc/st2/st2.docker.conf:ro
- ./files/st2.user.conf:/etc/st2/st2.user.conf:ro
- stackstorm-virtualenvs:/opt/stackstorm/virtualenvs:ro
- stackstorm-packs:/opt/stackstorm/packs:ro
- stackstorm-packs-configs:/opt/stackstorm/configs:ro
- ${ST2_PACKS_DEV:-./packs.dev}:/opt/stackstorm/packs.dev:ro And added an additional service to isolate the RabbitMQ sensor st2sensorcontainer-rabbitmq:
image: ${ST2_IMAGE_REPO:-stackstorm/}st2sensorcontainer:${ST2_VERSION:-latest}
restart: on-failure
depends_on:
- st2api
networks:
- private
command:
- "/opt/stackstorm/st2/bin/st2sensorcontainer"
- "--config-file=/etc/st2/st2.conf"
- "--config-file=/etc/st2/st2.docker.conf"
- "--config-file=/etc/st2/st2.user.conf"
- "--single-sensor-mode"
- "--sensor-ref"
- "rabbitmq.RabbitMQQueueSensor"
- "--debug"
volumes:
- ./files/st2.docker.conf:/etc/st2/st2.docker.conf:ro
- ./files/st2.user.conf:/etc/st2/st2.user.conf:ro
- stackstorm-virtualenvs:/opt/stackstorm/virtualenvs:ro
- stackstorm-packs:/opt/stackstorm/packs:ro
- stackstorm-packs-configs:/opt/stackstorm/configs:ro
- ${ST2_PACKS_DEV:-./packs.dev}:/opt/stackstorm/packs.dev:ro This resulted in the same behavior for I then added Heartbeats are now occuring2022-11-30 21:58:18,645 DEBUG [-] 1 active sensor(s)
2022-11-30 21:58:19,647 DEBUG [-] 1 active sensor(s)
2022-11-30 21:58:20,648 DEBUG [-] 1 active sensor(s)
2022-11-30 21:58:21,650 DEBUG [-] 1 active sensor(s)
2022-11-30 21:58:22,651 DEBUG [-] 1 active sensor(s)
2022-11-30 21:58:23,653 DEBUG [-] 1 active sensor(s)
2022-11-30 21:58:24,148 DEBUG [-] Sending heartbeat frame
2022-11-30 21:58:24,148 DEBUG [-] Sending heartbeat frame
2022-11-30 21:58:24,148 DEBUG [-] SelectorIOServicesAdapter.set_writer(7, <bound method _AsyncPlaintextTransport._on_socket_writable of <pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fabd438fbb0>>)
2022-11-30 21:58:24,148 DEBUG [-] set_writer(7, _) updated handler RdWr
2022-11-30 21:58:24,149 DEBUG [-] Turned on writability watcher: <eventlet.greenio.base.GreenSocket object at 0x7fabd438f700>
2022-11-30 21:58:24,150 DEBUG [-] call_later: added timeout <pika.adapters.select_connection._Timeout object at 0x7fabd438f3a0> with deadline=48514.242690353 and callback=<bound method HeartbeatChecker._send_heartbeat of <pika.heartbeat.HeartbeatChecker object at 0x7fabd438f3d0>>; now=48484.242690353; delay=30.0
2022-11-30 21:58:24,150 DEBUG [-] SelectorIOServicesAdapter.remove_writer(7)
2022-11-30 21:58:24,151 DEBUG [-] remove_writer(7) updated handler Rd
2022-11-30 21:58:24,151 DEBUG [-] Turned off writability watcher: <eventlet.greenio.base.GreenSocket object at 0x7fabd438f700>
2022-11-30 21:58:24,639 DEBUG [-] Received heartbeat frame
2022-11-30 21:58:24,640 DEBUG [-] Recv would block on <eventlet.greenio.base.GreenSocket object at 0x7fabd438f700>
2022-11-30 21:58:24,654 DEBUG [-] 1 active sensor(s)
2022-11-30 21:58:25,656 DEBUG [-] 1 active sensor(s)
2022-11-30 21:58:26,657 DEBUG [-] 1 active sensor(s) I also noted that RabbitMQ actually sees an active consumer now, where it did not before. Test message to the queue2022-11-30 22:02:15,918 DEBUG [-] 1 active sensor(s)
2022-11-30 22:02:16,639 DEBUG [-] Recv would block on <eventlet.greenio.base.GreenSocket object at 0x7fabd438f700>
2022-11-30 22:02:16,639 DEBUG [-] Received message for queue platform_jobs with body b'test5\r\n'
2022-11-30 22:02:16,640 DEBUG [-] Added trace_context None to trigger rabbitmq.new_message.
2022-11-30 22:02:16,642 DEBUG [-] Dispatching trigger rabbitmq.new_message with payload {'queue': 'platform_jobs', 'body': 'test5\r\n'}.
2022-11-30 22:02:16,643 DEBUG [-] Dispatching trigger (trigger=rabbitmq.new_message,payload={'trigger': 'rabbitmq.new_message', 'payload': {'queue': 'platform_jobs', 'body': 'test5\r\n'}, 'trace_context': None})
2022-11-30 22:02:16,643 DEBUG [-] using channel_id: 1
2022-11-30 22:02:16,644 DEBUG [-] Channel open
2022-11-30 22:02:16,645 DEBUG [-] Closed channel #1
2022-11-30 22:02:16,646 DEBUG [-] SelectorIOServicesAdapter.set_writer(7, <bound method _AsyncPlaintextTransport._on_socket_writable of <pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fabd438fbb0>>)
2022-11-30 22:02:16,646 DEBUG [-] set_writer(7, _) updated handler RdWr
2022-11-30 22:02:16,646 DEBUG [-] Turned on writability watcher: <eventlet.greenio.base.GreenSocket object at 0x7fabd438f700>
2022-11-30 22:02:16,646 DEBUG [-] SelectorIOServicesAdapter.remove_writer(7)
2022-11-30 22:02:16,646 DEBUG [-] remove_writer(7) updated handler Rd
2022-11-30 22:02:16,646 DEBUG [-] Turned off writability watcher: <eventlet.greenio.base.GreenSocket object at 0x7fabd438f700>
2022-11-30 22:02:16,919 DEBUG [-] 1 active sensor(s) So now we know its a system/process permissions issue. I suppose that makes sense that this works in standalone, where process can run as root, where as by default the user for I think I'm close... |
Beta Was this translation helpful? Give feedback.
-
Well, I dont think the permission issues are related to pika: This implies to me that it has to do with Stackstorm's code for |
Beta Was this translation helpful? Give feedback.
-
Interestingly, trying the stock config for the for I also made sure that it wasn't something because of explicitly adding the command and args. Same behavior even when adding them explicitly st2sensorcontainer:
image: ${ST2_IMAGE_REPO:-stackstorm/}st2sensorcontainer:${ST2_VERSION:-latest}
restart: on-failure
depends_on:
- st2api
networks:
- private
user: root
command:
- "/opt/stackstorm/st2/bin/st2sensorcontainer"
- "--config-file=/etc/st2/st2.conf"
- "--config-file=/etc/st2/st2.docker.conf"
- "--config-file=/etc/st2/st2.user.conf"
- "--debug"
volumes:
- ./files/st2.docker.conf:/etc/st2/st2.docker.conf:ro
- ./files/st2.user.conf:/etc/st2/st2.user.conf:ro
- stackstorm-virtualenvs:/opt/stackstorm/virtualenvs:ro
- stackstorm-packs:/opt/stackstorm/packs:ro
- stackstorm-packs-configs:/opt/stackstorm/configs:ro
- ${ST2_PACKS_DEV:-./packs.dev}:/opt/stackstorm/packs.dev:ro So far the only way I've seen this work, is when the sensor is ran in standalone mode and the user is set to Curiouser and curiouser. |
Beta Was this translation helpful? Give feedback.
-
How about the other sensor? Definitely worth trying. If it's only the RabbitMQ sensor that behaves that way, one of the suspects could be something relevant to the asynchronous/threading stuff that RabbitMQ sensor does that might be conflicting with the stackstorm sensor core itself, but I'm not sure: The fact of having different results running from root/non-root makes it even weirder. |
Beta Was this translation helpful? Give feedback.
-
I took a look at the new sensor contributed by @stnley , and i stumbled upon some weird behavior regarding I could get this new sensor working via docker in one specific scenario; A fresh The below is using the 1.2.0 version from StackStorm-Exchange/stackstorm-rabbitmq#21 with the intent of just running all sensors in the single container to start.
Single Sensor Container (working initially)This includes the sensors: filewatch and RabbitMQBoundQueueSensor 2023-01-12 10:10:14 2023-01-12 18:10:14,425 INFO [-] Using Python: 3.8.10 (/opt/stackstorm/st2/bin/python)
2023-01-12 10:10:14 2023-01-12 18:10:14,425 INFO [-] Using fs encoding: utf-8, default encoding: utf-8, locale: en_US.UTF-8, LANG env variable: en_US.UTF-8, PYTHONIOENCODING env variable: notset
2023-01-12 10:10:14 2023-01-12 18:10:14,426 INFO [-] Using config files: /etc/st2/st2.conf,/etc/st2/st2.docker.conf,/etc/st2/st2.user.conf
2023-01-12 10:10:14 2023-01-12 18:10:14,426 INFO [-] Using logging config: /etc/st2/logging.sensorcontainer.conf
2023-01-12 10:10:14 2023-01-12 18:10:14,426 INFO [-] Using coordination driver: redis
2023-01-12 10:10:14 2023-01-12 18:10:14,426 INFO [-] Using metrics driver: noop
2023-01-12 10:10:14 2023-01-12 18:10:14,436 INFO [-] Connecting to database "st2" @ "mongo:27017" as user "None".
2023-01-12 10:10:14 2023-01-12 18:10:14,447 INFO [-] Successfully connected to database "st2" @ "mongo:27017" as user "None".
2023-01-12 10:10:14 2023-01-12 18:10:14,660 INFO [-] Using partitioner default with sensornode sensornode1.
2023-01-12 10:10:14 2023-01-12 18:10:14,662 INFO [-] Found 2 registered sensors in db scan.
2023-01-12 10:10:14 2023-01-12 18:10:14,663 INFO [-] Setting up container to run 2 sensors.
2023-01-12 10:10:14 2023-01-12 18:10:14,664 INFO [-] Sensors list - ['linux.FileWatchSensor', 'rabbitmq.RabbitMQBoundQueueSensor'].
2023-01-12 10:10:14 2023-01-12 18:10:14,664 INFO [-] (PID:1) SensorContainer started.
2023-01-12 10:10:14 2023-01-12 18:10:14,664 INFO [-] Running sensor linux.FileWatchSensor
2023-01-12 10:10:14 2023-01-12 18:10:14,673 INFO [-] Connected to amqp://guest:**@rabbitmq:5672//
2023-01-12 10:10:14 2023-01-12 18:10:14,691 INFO [-] Sensor linux.FileWatchSensor started
2023-01-12 10:10:14 2023-01-12 18:10:14,691 INFO [-] Running sensor rabbitmq.RabbitMQBoundQueueSensor
2023-01-12 10:10:14 2023-01-12 18:10:14,711 INFO [-] Sensor rabbitmq.RabbitMQBoundQueueSensor started
2023-01-12 10:10:15 2023-01-12 18:10:15,403 INFO [-] No config found for sensor "FileWatchSensor"
2023-01-12 10:10:15 2023-01-12 18:10:15,404 INFO [-] Watcher started
2023-01-12 10:10:15 2023-01-12 18:10:15,404 INFO [-] Running sensor initialization code
2023-01-12 10:10:15 2023-01-12 18:10:15,409 INFO [-] Running sensor in passive mode
2023-01-12 10:10:15 2023-01-12 18:10:15,420 INFO [-] Connected to amqp://guest:**@rabbitmq:5672//
2023-01-12 10:10:15 2023-01-12 18:10:15,440 INFO [-] Found config for sensor "RabbitMQBoundQueueSensor"
2023-01-12 10:10:15 2023-01-12 18:10:15,440 INFO [-] USERDEBUG ReconnectingConsumer.init complete
2023-01-12 10:10:15 2023-01-12 18:10:15,440 INFO [-] USERDEBUG main init complete
2023-01-12 10:10:15 2023-01-12 18:10:15,441 INFO [-] Watcher started
2023-01-12 10:10:15 2023-01-12 18:10:15,441 INFO [-] Running sensor initialization code
2023-01-12 10:10:15 2023-01-12 18:10:15,441 INFO [-] USERDEBUG ReconnectingConsumer.connect
2023-01-12 10:10:15 2023-01-12 18:10:15,441 INFO [-] USERDEBUG ReconnectingConsumer._open_connection
2023-01-12 10:10:15 2023-01-12 18:10:15,448 INFO [-] Running sensor in passive mode
2023-01-12 10:10:15 2023-01-12 18:10:15,448 INFO [-] USERDEBUG main run
2023-01-12 10:10:15 2023-01-12 18:10:15,449 INFO [-] USERDEBUG ReconnectingConsumer.run begin
2023-01-12 10:10:15 2023-01-12 18:10:15,452 INFO [-] Pika version 1.3.1 connecting to ('192.168.112.5', 5672)
2023-01-12 10:10:15 2023-01-12 18:10:15,454 INFO [-] Socket connected: <eventlet.greenio.base.GreenSocket object at 0x7f4f0586adf0>
2023-01-12 10:10:15 2023-01-12 18:10:15,454 INFO [-] Streaming transport linked up: (<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7f4f05a97940>, _StreamingProtocolShim: <SelectConnection PROTOCOL transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7f4f05a97940> params=<ConnectionParameters host=rabbitmq port=5672 virtual_host=/ ssl=False>>).
2023-01-12 10:10:15 2023-01-12 18:10:15,457 INFO [-] USERDEBUG ReconnectingConsumer._on_connection_open
2023-01-12 10:10:15 2023-01-12 18:10:15,458 INFO [-] USERDEBUG ReconnectingConsumer._open_channel
2023-01-12 10:10:15 2023-01-12 18:10:15,458 INFO [-] AMQPConnector - reporting success: <SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7f4f05a97940> params=<ConnectionParameters host=rabbitmq port=5672 virtual_host=/ ssl=False>>
2023-01-12 10:10:15 2023-01-12 18:10:15,458 INFO [-] AMQPConnectionWorkflow - reporting success: <SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7f4f05a97940> params=<ConnectionParameters host=rabbitmq port=5672 virtual_host=/ ssl=False>>
2023-01-12 10:10:15 2023-01-12 18:10:15,460 INFO [-] USERDEBUG ReconnectingConsumer._on_channel_open
2023-01-12 10:10:15 2023-01-12 18:10:15,461 INFO [-] USERDEBUG ReconnectingConsumer._setup_exchanges
2023-01-12 10:10:15 2023-01-12 18:10:15,463 INFO [-] USERDEBUG ReconnectingConsumer._on_exchange_declare_ok
2023-01-12 10:10:15 2023-01-12 18:10:15,463 INFO [-] USERDEBUG ReconnectingConsumer._setup_queues
2023-01-12 10:10:15 2023-01-12 18:10:15,466 INFO [-] USERDEBUG ReconnectingConsumer._on_queue_declare_ok
2023-01-12 10:10:15 2023-01-12 18:10:15,466 INFO [-] USERDEBUG ReconnectingConsumer._setup_bindings
2023-01-12 10:10:15 2023-01-12 18:10:15,467 INFO [-] USERDEBUG ReconnectingConsumer._on_bind_ok
2023-01-12 10:10:15 2023-01-12 18:10:15,467 INFO [-] USERDEBUG ReconnectingConsumer._start_consuming Single Sensor Container with container restart (Stop working)Notice that after the container is restarted in place, the sensor fails to progress past 2023-01-12 10:10:15 2023-01-12 18:10:15,440 INFO [-] Found config for sensor "RabbitMQBoundQueueSensor"
2023-01-12 10:10:15 2023-01-12 18:10:15,440 INFO [-] USERDEBUG ReconnectingConsumer.init complete
2023-01-12 10:10:15 2023-01-12 18:10:15,440 INFO [-] USERDEBUG main init complete
2023-01-12 10:10:15 2023-01-12 18:10:15,441 INFO [-] Watcher started
2023-01-12 10:10:15 2023-01-12 18:10:15,441 INFO [-] Running sensor initialization code
2023-01-12 10:10:15 2023-01-12 18:10:15,441 INFO [-] USERDEBUG ReconnectingConsumer.connect
2023-01-12 10:10:15 2023-01-12 18:10:15,441 INFO [-] USERDEBUG ReconnectingConsumer._open_connection
2023-01-12 10:10:15 2023-01-12 18:10:15,448 INFO [-] Running sensor in passive mode
2023-01-12 10:10:15 2023-01-12 18:10:15,448 INFO [-] USERDEBUG main run
2023-01-12 10:10:15 2023-01-12 18:10:15,449 INFO [-] USERDEBUG ReconnectingConsumer.run begin
2023-01-12 10:10:15 2023-01-12 18:10:15,452 INFO [-] Pika version 1.3.1 connecting to ('192.168.112.5', 5672)
2023-01-12 10:10:15 2023-01-12 18:10:15,454 INFO [-] Socket connected: <eventlet.greenio.base.GreenSocket object at 0x7f4f0586adf0>
2023-01-12 10:10:15 2023-01-12 18:10:15,454 INFO [-] Streaming transport linked up: (<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7f4f05a97940>, _StreamingProtocolShim: <SelectConnection PROTOCOL transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7f4f05a97940> params=<ConnectionParameters host=rabbitmq port=5672 virtual_host=/ ssl=False>>).
2023-01-12 10:10:15 2023-01-12 18:10:15,457 INFO [-] USERDEBUG ReconnectingConsumer._on_connection_open
2023-01-12 10:10:15 2023-01-12 18:10:15,458 INFO [-] USERDEBUG ReconnectingConsumer._open_channel
2023-01-12 10:10:15 2023-01-12 18:10:15,458 INFO [-] AMQPConnector - reporting success: <SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7f4f05a97940> params=<ConnectionParameters host=rabbitmq port=5672 virtual_host=/ ssl=False>>
2023-01-12 10:10:15 2023-01-12 18:10:15,458 INFO [-] AMQPConnectionWorkflow - reporting success: <SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7f4f05a97940> params=<ConnectionParameters host=rabbitmq port=5672 virtual_host=/ ssl=False>>
2023-01-12 10:10:15 2023-01-12 18:10:15,460 INFO [-] USERDEBUG ReconnectingConsumer._on_channel_open
2023-01-12 10:10:15 2023-01-12 18:10:15,461 INFO [-] USERDEBUG ReconnectingConsumer._setup_exchanges
2023-01-12 10:10:15 2023-01-12 18:10:15,463 INFO [-] USERDEBUG ReconnectingConsumer._on_exchange_declare_ok
2023-01-12 10:10:15 2023-01-12 18:10:15,463 INFO [-] USERDEBUG ReconnectingConsumer._setup_queues
2023-01-12 10:10:15 2023-01-12 18:10:15,466 INFO [-] USERDEBUG ReconnectingConsumer._on_queue_declare_ok
2023-01-12 10:10:15 2023-01-12 18:10:15,466 INFO [-] USERDEBUG ReconnectingConsumer._setup_bindings
2023-01-12 10:10:15 2023-01-12 18:10:15,467 INFO [-] USERDEBUG ReconnectingConsumer._on_bind_ok
2023-01-12 10:10:15 2023-01-12 18:10:15,467 INFO [-] USERDEBUG ReconnectingConsumer._start_consuming
2023-01-12 10:11:27 2023-01-12 18:11:27,534 INFO [-] Container shutting down. Invoking cleanup on sensors.
2023-01-12 10:11:29 2023-01-12 18:11:29,535 INFO [-] All sensors are shut down.
2023-01-12 10:11:29 2023-01-12 18:11:29,549 INFO [-] (PID:1) SensorContainer stopped. Reason - SystemExit
2023-01-12 10:11:31 2023-01-12 18:11:31,047 INFO [-] Using Python: 3.8.10 (/opt/stackstorm/st2/bin/python)
2023-01-12 10:11:31 2023-01-12 18:11:31,047 INFO [-] Using fs encoding: utf-8, default encoding: utf-8, locale: en_US.UTF-8, LANG env variable: en_US.UTF-8, PYTHONIOENCODING env variable: notset
2023-01-12 10:11:31 2023-01-12 18:11:31,047 INFO [-] Using config files: /etc/st2/st2.conf,/etc/st2/st2.docker.conf,/etc/st2/st2.user.conf
2023-01-12 10:11:31 2023-01-12 18:11:31,048 INFO [-] Using logging config: /etc/st2/logging.sensorcontainer.conf
2023-01-12 10:11:31 2023-01-12 18:11:31,048 INFO [-] Using coordination driver: redis
2023-01-12 10:11:31 2023-01-12 18:11:31,048 INFO [-] Using metrics driver: noop
2023-01-12 10:11:31 2023-01-12 18:11:31,058 INFO [-] Connecting to database "st2" @ "mongo:27017" as user "None".
2023-01-12 10:11:31 2023-01-12 18:11:31,068 INFO [-] Successfully connected to database "st2" @ "mongo:27017" as user "None".
2023-01-12 10:11:31 2023-01-12 18:11:31,273 INFO [-] Using partitioner default with sensornode sensornode1.
2023-01-12 10:11:31 2023-01-12 18:11:31,275 INFO [-] Found 2 registered sensors in db scan.
2023-01-12 10:11:31 2023-01-12 18:11:31,276 INFO [-] Setting up container to run 2 sensors.
2023-01-12 10:11:31 2023-01-12 18:11:31,276 INFO [-] Sensors list - ['linux.FileWatchSensor', 'rabbitmq.RabbitMQBoundQueueSensor'].
2023-01-12 10:11:31 2023-01-12 18:11:31,277 INFO [-] (PID:1) SensorContainer started.
2023-01-12 10:11:31 2023-01-12 18:11:31,278 INFO [-] Running sensor linux.FileWatchSensor
2023-01-12 10:11:31 2023-01-12 18:11:31,299 INFO [-] Connected to amqp://guest:**@rabbitmq:5672//
2023-01-12 10:11:31 2023-01-12 18:11:31,304 INFO [-] Sensor linux.FileWatchSensor started
2023-01-12 10:11:31 2023-01-12 18:11:31,305 INFO [-] Running sensor rabbitmq.RabbitMQBoundQueueSensor
2023-01-12 10:11:31 2023-01-12 18:11:31,319 INFO [-] Sensor rabbitmq.RabbitMQBoundQueueSensor started
2023-01-12 10:11:31 2023-01-12 18:11:31,952 INFO [-] No config found for sensor "FileWatchSensor"
2023-01-12 10:11:31 2023-01-12 18:11:31,952 INFO [-] Watcher started
2023-01-12 10:11:31 2023-01-12 18:11:31,952 INFO [-] Running sensor initialization code
2023-01-12 10:11:31 2023-01-12 18:11:31,957 INFO [-] Running sensor in passive mode
2023-01-12 10:11:31 2023-01-12 18:11:31,969 INFO [-] Connected to amqp://guest:**@rabbitmq:5672//
2023-01-12 10:11:31 2023-01-12 18:11:31,989 INFO [-] Found config for sensor "RabbitMQBoundQueueSensor"
2023-01-12 10:11:31 2023-01-12 18:11:31,989 INFO [-] USERDEBUG ReconnectingConsumer.init complete
2023-01-12 10:11:31 2023-01-12 18:11:31,989 INFO [-] USERDEBUG main init complete
2023-01-12 10:11:31 2023-01-12 18:11:31,990 INFO [-] Watcher started
2023-01-12 10:11:31 2023-01-12 18:11:31,990 INFO [-] Running sensor initialization code
2023-01-12 10:11:31 2023-01-12 18:11:31,990 INFO [-] USERDEBUG ReconnectingConsumer.connect
2023-01-12 10:11:31 2023-01-12 18:11:31,990 INFO [-] USERDEBUG ReconnectingConsumer._open_connection
2023-01-12 10:11:31 2023-01-12 18:11:31,996 INFO [-] Running sensor in passive mode
2023-01-12 10:11:31 2023-01-12 18:11:31,996 INFO [-] USERDEBUG main run
2023-01-12 10:11:31 2023-01-12 18:11:31,996 INFO [-] USERDEBUG ReconnectingConsumer.run begin
<end of log> What could be happening that it works perfectly fine on first load, but a simple container restart causes it to fail? |
Beta Was this translation helpful? Give feedback.
-
I was able to get this sensor working completely independent of stackstorm. There is definitely something going on with how the To summarize a little bit:
Run Sensor code without ST2 WrappingI ran this in the exact same container that fails to run this code successfully in Modified Sensor CodeNOTE requires pyyaml and pika import copy
import functools
import json
import logging
import pickle
import pika
import pika.exchange_type
import yaml
def load_config():
with open("/opt/stackstorm/configs/rabbitmq.yaml", "r") as file:
return yaml.safe_load(file)
logging.basicConfig(level=logging.DEBUG)
class ReconnectingConsumer:
AMQP_PREFETCH = 1
DEFAULT_USERNAME = pika.ConnectionParameters.DEFAULT_USERNAME
DEFAULT_PASSWORD = pika.ConnectionParameters.DEFAULT_PASSWORD
DEFAULT_EXCHANGE = pika.exchange_type.ExchangeType.direct
DESERIALIZATION_FUNCTIONS = {"json": json.loads, "pickle": pickle.loads}
# def __init__(self, logger, config, message_dispatch) -> None:
def __init__(
self, logging=logging, config=load_config()["sensor_binding_config"]
) -> None:
self.should_reconnect = False
self.was_consuming = False
# self._logger = logger
self._logger = logging
self._config = config
# self.dispatch = message_dispatch
self._host = self._config["host"]
self._username = self._config.get("username", self.DEFAULT_USERNAME)
self._password = self._config.get("password", self.DEFAULT_PASSWORD)
self._conn = None
self._channel = None
self._closing = False
self._consuming = False
self._consumer_tags = set()
self._deserialization_method = self._config.get(
"deserialization_method", "json"
)
if self._deserialization_method not in self.DESERIALIZATION_FUNCTIONS:
raise ValueError(
"Invalid deserialization method specified: %s"
% (self._deserialization_method)
)
self._logger.info("USERDEBUG ReconnectingConsumer.init complete")
def run(self):
self._logger.info("USERDEBUG ReconnectingConsumer.run begin")
self._conn.ioloop.start()
def stop(self):
self._logger.info("USERDEBUG ReconnectingConsumer.stop")
if not self._closing:
self._closing = True
self._logger.info("Stopping")
if self._consuming:
self._stop_consuming()
self._conn.ioloop.start()
else:
self._conn.ioloop.stop()
self._logger.info("Stopped")
def connect(self):
self._logger.info("USERDEBUG ReconnectingConsumer.connect")
credentials = pika.PlainCredentials(self._username, self._password)
connection_params = pika.ConnectionParameters(
host=self._host, credentials=credentials
)
self._conn = self._open_connection(connection_params)
def _reconnect(self):
self._logger.info("USERDEBUG ReconnectingConsumer._reconnect")
self.should_reconnect = True
self.stop()
def _open_connection(self, params):
self._logger.info("USERDEBUG ReconnectingConsumer._open_connection")
return pika.SelectConnection(
params,
on_open_callback=self._on_connection_open,
on_open_error_callback=self._on_connection_open_error,
on_close_callback=self._on_connection_close,
)
def _close_connection(self):
self._logger.info("USERDEBUG ReconnectingConsumer._close_connection")
self._consuming = False
if self._conn.is_closing or self._conn.is_closed:
self._logger.debug("Connection is closing or already closed")
else:
self._logger.debug("Closing connection")
self._conn.close()
def _on_connection_open(self, connection):
self._logger.info("USERDEBUG ReconnectingConsumer._on_connection_open")
self._logger.debug("Connection opened")
self._open_channel()
def _on_connection_open_error(self, connection, err):
self._logger.info("USERDEBUG ReconnectingConsumer._on_connection_open_error")
self._logger.error("Connection open failed: %s", err)
self._reconnect()
def _on_connection_close(self, connection, reason):
self._logger.info("USERDEBUG ReconnectingConsumer._on_connection_close")
self._channel = None
if self._closing:
self._conn.ioloop.stop()
else:
self._logger.warning("Connection closed, reconnect necessary: %s", reason)
self._reconnect()
def _open_channel(self):
self._logger.info("USERDEBUG ReconnectingConsumer._open_channel")
self._logger.debug("Creating new channel")
self._conn.channel(on_open_callback=self._on_channel_open)
def _close_channel(self):
self._logger.info("USERDEBUG ReconnectingConsumer._close_channel")
self._logger.debug("Closing channel")
self._channel.close()
def _on_channel_open(self, channel):
self._logger.info("USERDEBUG ReconnectingConsumer._on_channel_open")
self._logger.debug("Channel opened")
self._channel = channel
self._logger.debug("Adding channel close callback")
self._channel.add_on_close_callback(self._on_channel_closed)
self._logger.debug("Setting channel prefetch: %s", self.AMQP_PREFETCH)
self._channel.basic_qos(
prefetch_count=self.AMQP_PREFETCH, callback=self._on_basic_qos_ok
)
def _on_channel_closed(self, channel, reason):
self._logger.warning("Channel %i was closed: %s", channel, reason)
self._close_connection()
def _on_basic_qos_ok(self, frame):
self._logger.debug("QOS set to: %s", self.AMQP_PREFETCH)
self._setup_exchanges()
def _setup_exchanges(self):
self._logger.info("USERDEBUG ReconnectingConsumer._setup_exchanges")
self._logger.debug("Setting up configured exchanges")
for exchange in self._config.get("exchanges", list()):
name = exchange["name"]
exchange_type = exchange.get("exchange_type", self.DEFAULT_EXCHANGE)
queues = exchange.get("queues", list())
self._logger.debug("Declaring exchange: %s", name)
# pass config object as extra argument in callback
callback = functools.partial(
self._on_exchange_declare_ok, userdata=(queues, name)
)
self._channel.exchange_declare(
exchange=name, exchange_type=exchange_type, callback=callback
)
def _on_exchange_declare_ok(self, frame, userdata):
self._logger.info("USERDEBUG ReconnectingConsumer._on_exchange_declare_ok")
queues, exchange = userdata
self._logger.debug("Exchange declared: %s", exchange)
self._setup_queues(queues, exchange)
def _setup_queues(self, queues, exchange):
self._logger.info("USERDEBUG ReconnectingConsumer._setup_queues")
self._logger.debug("Setting up configured queues for exchange %s", exchange)
for queue in queues:
self._logger.debug("Declaring queue: %s", queue["name"])
callback = functools.partial(
self._on_queue_declare_ok, userdata=(queue, exchange)
)
self._channel.queue_declare(queue=queue["name"], callback=callback)
def _on_queue_declare_ok(self, frame, userdata):
self._logger.info("USERDEBUG ReconnectingConsumer._on_queue_declare_ok")
queue, exchange = userdata
self._logger.debug("Queue declared: %s", queue["name"])
self._setup_bindings(queue, exchange)
def _setup_bindings(self, queue, exchange):
self._logger.info("USERDEBUG ReconnectingConsumer._setup_bindings")
queue, bindings = queue["name"], queue.get("bindings", list())
self._logger.debug("Setting up bindings for queue %s", queue)
for binding in bindings:
self._logger.debug("Declaring binding for queue: %s (%s)", queue, bindings)
routing_key, arguments = binding.get("routing_key"), binding.get(
"arguments"
)
callback = functools.partial(self._on_bind_ok, userdata=(binding, queue))
self._channel.queue_bind(
queue=queue,
exchange=exchange,
routing_key=routing_key,
arguments=arguments,
callback=callback,
)
def _on_bind_ok(self, frame, userdata):
self._logger.info("USERDEBUG ReconnectingConsumer._on_bind_ok")
binding, queue = userdata
self._logger.debug("Binding ok for queue: %s (%s)", queue, binding)
self._start_consuming(queue)
def _start_consuming(self, queue):
self._logger.info("USERDEBUG ReconnectingConsumer._start_consuming")
self._logger.debug("Issuing consumer related RPC commands")
callback = functools.partial(self._on_message, userdata=queue)
consumer_tag = self._channel.basic_consume(queue, callback)
self._consumer_tags.add(consumer_tag)
if not self._consuming:
# only add callback once
self._logger.debug("Adding consumer cancellation callback")
self._channel.add_on_cancel_callback(self._on_consumer_cancelled)
self._consuming = True
self.was_consuming = True
def _stop_consuming(self):
self._logger.info("USERDEBUG ReconnectingConsumer._stop_consuming")
if self._channel:
consumers_copy = copy.deepcopy(self._consumer_tags)
for consumer_tag in consumers_copy:
self._logger.debug(
"Sending a Basic.Cancel RPC command to RabbitMQ for consumer %s",
consumer_tag,
)
callback = functools.partial(
self._on_consumer_cancelled_ok, userdata=consumer_tag
)
self._channel.basic_cancel(consumer_tag, callback)
def _on_consumer_cancelled(self, frame):
consumer_tag = frame.method.consumer_tag
self._logger.warning("Consumer was cancelled remotely: %s", consumer_tag)
self._consumer_tags.discard(consumer_tag)
def _on_consumer_cancelled_ok(self, frame, userdata):
self._logger.debug(
"RabbitMQ acknowledged the cancellation of the consumer: %s", userdata
)
self._consumer_tags.discard(userdata)
if not self._consumer_tags:
# we either are shutting down or all consumers have been cancelled
self._close_channel()
def _on_message(self, channel, basic_deliver, properties, body, userdata):
self._logger.info("USERDEBUG ReconnectingConsumer._on_message")
body = body.decode("utf-8")
self._logger.debug("Received message for queue %s with body %s", userdata, body)
body = self._deserialize_body(body)
payload = {"queue": userdata, "body": body}
try:
# self._dispatch(payload=payload)
print(payload)
finally:
self._channel.basic_ack(basic_deliver.delivery_tag)
def _deserialize_body(self, body):
if not self._deserialization_method:
return body
deserialization_func = self.DESERIALIZATION_FUNCTIONS[
self._deserialization_method
]
try:
body = deserialization_func(body)
except json.JSONDecodeError:
pass
return body The important part is that it all connected, and when i published the message (Present in log below) INFO:root:USERDEBUG ReconnectingConsumer._on_message
DEBUG:root:Received message for queue platform_jobs with body test4
{'queue': 'platform_jobs', 'body': 'test4'} Huge DEBUG Execution Log# python3
Python 3.8.10 (default, Mar 15 2022, 12:22:08)
[GCC 9.4.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from test import ReconnectingConsumer
>>> sensor = ReconnectingConsumer()
INFO:root:USERDEBUG ReconnectingConsumer.init complete
>>> sensor.connect()
INFO:root:USERDEBUG ReconnectingConsumer.connect
INFO:root:USERDEBUG ReconnectingConsumer._open_connection
DEBUG:pika.adapters.select_connection:Using EPollPoller
DEBUG:pika.connection:New Connection state: CLOSED (prev=CLOSED)
DEBUG:pika.callback:Added: {'callback': <bound method Connection._on_connection_start of <SelectConnection CLOSED transport=None params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>>, 'one_shot': True, 'only': None, 'arguments': None, 'calls': 1}
DEBUG:pika.callback:Added: {'callback': <bound method Connection._on_connection_close_from_broker of <SelectConnection CLOSED transport=None params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>>, 'one_shot': True, 'only': None, 'arguments': None, 'calls': 1}
DEBUG:pika.callback:Added: {'callback': <bound method ReconnectingConsumer._on_connection_open_error of <test.ReconnectingConsumer object at 0x7fccbe85b2e0>>, 'one_shot': False, 'only': None, 'arguments': None}
DEBUG:pika.callback:Added: {'callback': <bound method ReconnectingConsumer._on_connection_open of <test.ReconnectingConsumer object at 0x7fccbe85b2e0>>, 'one_shot': False, 'only': None, 'arguments': None}
DEBUG:pika.callback:Added: {'callback': <bound method ReconnectingConsumer._on_connection_close of <test.ReconnectingConsumer object at 0x7fccbe85b2e0>>, 'one_shot': False, 'only': None, 'arguments': None}
DEBUG:pika.connection:New Connection state: INIT (prev=CLOSED)
DEBUG:pika.adapters.utils.connection_workflow:Starting AMQP Connection workflow asynchronously.
DEBUG:pika.adapters.select_connection:call_later: added timeout <pika.adapters.select_connection._Timeout object at 0x7fccbdfc6340> with deadline=602575.51652207 and callback=functools.partial(<bound method AMQPConnectionWorkflow._start_new_cycle_async of <pika.adapters.utils.connection_workflow.AMQPConnectionWorkflow object at 0x7fccbdfc63d0>>, first=True); now=602575.51652207; delay=0
>>>
>>>
>>> sensor.run()
INFO:root:USERDEBUG ReconnectingConsumer.run begin
DEBUG:pika.adapters.select_connection:Entering IOLoop
DEBUG:pika.adapters.utils.connection_workflow:Beginning a new AMQP connection workflow cycle; attempts remaining after this: 0
DEBUG:pika.adapters.select_connection:call_later: added timeout <pika.adapters.select_connection._Timeout object at 0x7fccbdfb9b80> with deadline=602656.88027572 and callback=<bound method AMQPConnectionWorkflow._try_next_config_async of <pika.adapters.utils.connection_workflow.AMQPConnectionWorkflow object at 0x7fccbdfc63d0>>; now=602656.88027572; delay=0
DEBUG:pika.adapters.utils.connection_workflow:_try_next_config_async: 'rabbitmq.service.us-west-2.consul':5672
DEBUG:pika.adapters.select_connection:add_callback_threadsafe: added callback=<bound method _AddressResolver._dispatch_result of <pika.adapters.utils.selector_ioloop_adapter._AddressResolver object at 0x7fccbdfb98e0>>
DEBUG:pika.adapters.select_connection:process_timeouts: invoking callback=<bound method _AddressResolver._dispatch_result of <pika.adapters.utils.selector_ioloop_adapter._AddressResolver object at 0x7fccbdfb98e0>>
DEBUG:pika.adapters.utils.selector_ioloop_adapter:Invoking asynchronous getaddrinfo() completion callback; host='rabbitmq.service.us-west-2.consul'
DEBUG:pika.adapters.utils.connection_workflow:getaddrinfo returned 2 records
DEBUG:pika.adapters.utils.connection_workflow:Attempting to connect using address record (<AddressFamily.AF_INET: 2>, <SocketKind.SOCK_STREAM: 1>, 6, '', ('172.16.104.96', 5672))
INFO:pika.adapters.utils.connection_workflow:Pika version 1.3.1 connecting to ('172.16.104.96', 5672)
DEBUG:pika.adapters.select_connection:add_callback_threadsafe: added callback=<bound method _AsyncSocketConnector._start_async of <pika.adapters.utils.io_services_utils._AsyncSocketConnector object at 0x7fccbdfc6640>>
DEBUG:pika.adapters.select_connection:call_later: added timeout <pika.adapters.select_connection._Timeout object at 0x7fccbdfc6940> with deadline=602666.892637374 and callback=<bound method AMQPConnector._on_tcp_connection_timeout of <pika.adapters.utils.connection_workflow.AMQPConnector object at 0x7fccbdfc67f0>>; now=602656.892637374; delay=10.0
DEBUG:pika.adapters.select_connection:call_later: added timeout <pika.adapters.select_connection._Timeout object at 0x7fccbdfc6670> with deadline=602671.892703394 and callback=<bound method AMQPConnector._on_overall_timeout of <pika.adapters.utils.connection_workflow.AMQPConnector object at 0x7fccbdfc67f0>>; now=602656.892703394; delay=15.0
DEBUG:pika.adapters.select_connection:process_timeouts: invoking callback=<bound method _AsyncSocketConnector._start_async of <pika.adapters.utils.io_services_utils._AsyncSocketConnector object at 0x7fccbdfc6640>>
DEBUG:pika.adapters.utils.selector_ioloop_adapter:SelectorIOServicesAdapter.set_writer(6, <bound method _AsyncSocketConnector._on_writable of <pika.adapters.utils.io_services_utils._AsyncSocketConnector object at 0x7fccbdfc6640>>)
DEBUG:pika.adapters.utils.selector_ioloop_adapter:set_writer(6, _) added handler Wr
DEBUG:pika.adapters.utils.io_services_utils:Connection-establishment is in progress for <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('172.17.0.11', 33440)>.
INFO:pika.adapters.utils.io_services_utils:Socket connected: <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('172.17.0.11', 33440), raddr=('172.16.104.96', 5672)>
DEBUG:pika.adapters.utils.io_services_utils:_AsyncSocketConnector._report_completion(None); <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('172.17.0.11', 33440), raddr=('172.16.104.96', 5672)>
DEBUG:pika.adapters.utils.selector_ioloop_adapter:SelectorIOServicesAdapter.remove_writer(6)
DEBUG:pika.adapters.utils.selector_ioloop_adapter:remove_writer(6) removed handler
DEBUG:pika.adapters.select_connection:remove_timeout: removing timeout <pika.adapters.select_connection._Timeout object at 0x7fccbdfc6940> with deadline=602666.892637374 and callback=<bound method AMQPConnector._on_tcp_connection_timeout of <pika.adapters.utils.connection_workflow.AMQPConnector object at 0x7fccbdfc67f0>>
DEBUG:pika.adapters.utils.connection_workflow:TCP connection to broker established: <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('172.17.0.11', 33440), raddr=('172.16.104.96', 5672)>.
DEBUG:pika.adapters.utils.io_services_utils:_AsyncStreamConnector.start(); <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('172.17.0.11', 33440), raddr=('172.16.104.96', 5672)>
DEBUG:pika.adapters.select_connection:add_callback_threadsafe: added callback=<bound method _AsyncStreamConnector._start_async of <pika.adapters.utils.io_services_utils._AsyncStreamConnector object at 0x7fccbdfc6760>>
DEBUG:pika.adapters.select_connection:process_timeouts: invoking callback=<bound method _AsyncStreamConnector._start_async of <pika.adapters.utils.io_services_utils._AsyncStreamConnector object at 0x7fccbdfc6760>>
DEBUG:pika.adapters.utils.io_services_utils:_AsyncStreamConnector._start_async(); <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('172.17.0.11', 33440), raddr=('172.16.104.96', 5672)>
DEBUG:pika.adapters.utils.io_services_utils:_AsyncStreamConnector._linkup()
DEBUG:pika.adapters.utils.io_services_utils:_AsyncTransportBase.__init__: <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('172.17.0.11', 33440), raddr=('172.16.104.96', 5672)>
DEBUG:pika.adapters.utils.selector_ioloop_adapter:SelectorIOServicesAdapter.set_reader(6, <bound method _AsyncPlaintextTransport._on_socket_readable of <pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730>>)
DEBUG:pika.adapters.utils.selector_ioloop_adapter:set_reader(6, _) added handler Rd
DEBUG:pika.adapters.utils.io_services_utils:_linkup(): created transport <pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730>
DEBUG:pika.connection:New Connection state: PROTOCOL (prev=INIT)
DEBUG:pika.adapters.utils.selector_ioloop_adapter:SelectorIOServicesAdapter.set_writer(6, <bound method _AsyncPlaintextTransport._on_socket_writable of <pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730>>)
DEBUG:pika.adapters.utils.selector_ioloop_adapter:set_writer(6, _) updated handler RdWr
DEBUG:pika.adapters.utils.io_services_utils:Turned on writability watcher: <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('172.17.0.11', 33440), raddr=('172.16.104.96', 5672)>
DEBUG:pika.adapters.utils.io_services_utils:_linkup(): introduced transport to protocol <pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730>; _StreamingProtocolShim: <SelectConnection PROTOCOL transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730> params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>
DEBUG:pika.adapters.utils.io_services_utils:_AsyncStreamConnector._report_completion((<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730>, _StreamingProtocolShim: <SelectConnection PROTOCOL transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730> params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>)); <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('172.17.0.11', 33440), raddr=('172.16.104.96', 5672)>
INFO:pika.adapters.utils.connection_workflow:Streaming transport linked up: (<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730>, _StreamingProtocolShim: <SelectConnection PROTOCOL transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730> params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>).
DEBUG:pika.callback:Added: {'callback': <bound method AMQPConnector._on_amqp_handshake_done of <pika.adapters.utils.connection_workflow.AMQPConnector object at 0x7fccbdfc67f0>>, 'one_shot': False, 'only': None, 'arguments': None}
DEBUG:pika.callback:Added: {'callback': <bound method AMQPConnector._on_amqp_handshake_done of <pika.adapters.utils.connection_workflow.AMQPConnector object at 0x7fccbdfc67f0>>, 'one_shot': False, 'only': None, 'arguments': None}
DEBUG:pika.adapters.utils.io_services_utils:_AsyncStreamConnector._cleanup(False)
DEBUG:pika.adapters.utils.selector_ioloop_adapter:SelectorIOServicesAdapter.remove_writer(6)
DEBUG:pika.adapters.utils.selector_ioloop_adapter:remove_writer(6) updated handler Rd
DEBUG:pika.adapters.utils.io_services_utils:Turned off writability watcher: <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('172.17.0.11', 33440), raddr=('172.16.104.96', 5672)>
DEBUG:pika.callback:Processing 0:Connection.Start
DEBUG:pika.callback:Processing use of oneshot callback
DEBUG:pika.callback:0 registered uses left
DEBUG:pika.callback:Removing callback #0: {'callback': <bound method Connection._on_connection_start of <SelectConnection PROTOCOL transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730> params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>>, 'one_shot': True, 'only': None, 'arguments': None, 'calls': 0}
DEBUG:pika.callback:Calling <bound method Connection._on_connection_start of <SelectConnection PROTOCOL transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730> params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>> for "0:Connection.Start"
DEBUG:pika.connection:New Connection state: START (prev=PROTOCOL)
DEBUG:pika.callback:Added: {'callback': <bound method Connection._on_connection_tune of <SelectConnection START transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730> params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>>, 'one_shot': True, 'only': None, 'arguments': None, 'calls': 1}
DEBUG:pika.adapters.utils.selector_ioloop_adapter:SelectorIOServicesAdapter.set_writer(6, <bound method _AsyncPlaintextTransport._on_socket_writable of <pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730>>)
DEBUG:pika.adapters.utils.selector_ioloop_adapter:set_writer(6, _) updated handler RdWr
DEBUG:pika.adapters.utils.io_services_utils:Turned on writability watcher: <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('172.17.0.11', 33440), raddr=('172.16.104.96', 5672)>
DEBUG:pika.adapters.utils.io_services_utils:Recv would block on <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('172.17.0.11', 33440), raddr=('172.16.104.96', 5672)>
DEBUG:pika.adapters.utils.selector_ioloop_adapter:SelectorIOServicesAdapter.remove_writer(6)
DEBUG:pika.adapters.utils.selector_ioloop_adapter:remove_writer(6) updated handler Rd
DEBUG:pika.adapters.utils.io_services_utils:Turned off writability watcher: <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('172.17.0.11', 33440), raddr=('172.16.104.96', 5672)>
DEBUG:pika.callback:Processing 0:Connection.Tune
DEBUG:pika.callback:Processing use of oneshot callback
DEBUG:pika.callback:0 registered uses left
DEBUG:pika.callback:Removing callback #0: {'callback': <bound method Connection._on_connection_tune of <SelectConnection START transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730> params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>>, 'one_shot': True, 'only': None, 'arguments': None, 'calls': 0}
DEBUG:pika.callback:Calling <bound method Connection._on_connection_tune of <SelectConnection START transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730> params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>> for "0:Connection.Tune"
DEBUG:pika.connection:New Connection state: TUNE (prev=START)
DEBUG:pika.connection:Creating a HeartbeatChecker: 60
DEBUG:pika.heartbeat:timeout: 60.000000 send_interval: 30.000000 check_interval: 65.000000
DEBUG:pika.adapters.select_connection:call_later: added timeout <pika.adapters.select_connection._Timeout object at 0x7fccbdfc61c0> with deadline=602686.900723801 and callback=<bound method HeartbeatChecker._send_heartbeat of <pika.heartbeat.HeartbeatChecker object at 0x7fccbdfc6550>>; now=602656.900723801; delay=30.0
DEBUG:pika.adapters.select_connection:call_later: added timeout <pika.adapters.select_connection._Timeout object at 0x7fccbdfc6a00> with deadline=602721.900845961 and callback=<bound method HeartbeatChecker._check_heartbeat of <pika.heartbeat.HeartbeatChecker object at 0x7fccbdfc6550>>; now=602656.900845961; delay=65
DEBUG:pika.adapters.utils.selector_ioloop_adapter:SelectorIOServicesAdapter.set_writer(6, <bound method _AsyncPlaintextTransport._on_socket_writable of <pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730>>)
DEBUG:pika.adapters.utils.selector_ioloop_adapter:set_writer(6, _) updated handler RdWr
DEBUG:pika.adapters.utils.io_services_utils:Turned on writability watcher: <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('172.17.0.11', 33440), raddr=('172.16.104.96', 5672)>
DEBUG:pika.callback:Added: {'callback': <bound method Connection._on_connection_open_ok of <SelectConnection TUNE transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730> params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>>, 'one_shot': True, 'only': None, 'arguments': None, 'calls': 1}
DEBUG:pika.adapters.utils.io_services_utils:Recv would block on <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('172.17.0.11', 33440), raddr=('172.16.104.96', 5672)>
DEBUG:pika.adapters.utils.selector_ioloop_adapter:SelectorIOServicesAdapter.remove_writer(6)
DEBUG:pika.adapters.utils.selector_ioloop_adapter:remove_writer(6) updated handler Rd
DEBUG:pika.adapters.utils.io_services_utils:Turned off writability watcher: <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('172.17.0.11', 33440), raddr=('172.16.104.96', 5672)>
DEBUG:pika.callback:Processing 0:Connection.OpenOk
DEBUG:pika.callback:Processing use of oneshot callback
DEBUG:pika.callback:0 registered uses left
DEBUG:pika.callback:Removing callback #0: {'callback': <bound method Connection._on_connection_open_ok of <SelectConnection TUNE transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730> params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>>, 'one_shot': True, 'only': None, 'arguments': None, 'calls': 0}
DEBUG:pika.callback:Calling <bound method Connection._on_connection_open_ok of <SelectConnection TUNE transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730> params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>> for "0:Connection.OpenOk"
DEBUG:pika.connection:New Connection state: OPEN (prev=TUNE)
DEBUG:pika.callback:Processing 0:_on_connection_open_ok
DEBUG:pika.callback:Calling <bound method ReconnectingConsumer._on_connection_open of <test.ReconnectingConsumer object at 0x7fccbe85b2e0>> for "0:_on_connection_open_ok"
INFO:root:USERDEBUG ReconnectingConsumer._on_connection_open
DEBUG:root:Connection opened
INFO:root:USERDEBUG ReconnectingConsumer._open_channel
DEBUG:root:Creating new channel
DEBUG:pika.connection:Creating channel 1
DEBUG:pika.callback:Added: {'callback': <bound method Connection._on_channel_cleanup of <SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730> params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>>, 'one_shot': True, 'only': <Channel number=1 CLOSED conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730> params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>>, 'arguments': None, 'calls': 1}
DEBUG:pika.callback:Added: {'callback': <bound method Channel._on_getempty of <Channel number=1 OPENING conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730> params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>>>, 'one_shot': False, 'only': None, 'arguments': None}
DEBUG:pika.callback:Added: {'callback': <bound method Channel._on_cancel of <Channel number=1 OPENING conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730> params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>>>, 'one_shot': False, 'only': None, 'arguments': None}
DEBUG:pika.callback:Added: {'callback': <bound method Channel._on_flow of <Channel number=1 OPENING conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730> params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>>>, 'one_shot': False, 'only': None, 'arguments': None}
DEBUG:pika.callback:Added: {'callback': <bound method Channel._on_close_from_broker of <Channel number=1 OPENING conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730> params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>>>, 'one_shot': True, 'only': None, 'arguments': None, 'calls': 1}
DEBUG:pika.adapters.utils.selector_ioloop_adapter:SelectorIOServicesAdapter.set_writer(6, <bound method _AsyncPlaintextTransport._on_socket_writable of <pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730>>)
DEBUG:pika.adapters.utils.selector_ioloop_adapter:set_writer(6, _) updated handler RdWr
DEBUG:pika.adapters.utils.io_services_utils:Turned on writability watcher: <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('172.17.0.11', 33440), raddr=('172.16.104.96', 5672)>
DEBUG:pika.channel:Entering blocking state on frame <Channel.Open(['out_of_band='])>; acceptable_replies=[<class 'pika.spec.Channel.OpenOk'>]
DEBUG:pika.channel:Adding on_synchronous_complete callback
DEBUG:pika.callback:Added: {'callback': <bound method Channel._on_synchronous_complete of <Channel number=1 OPENING conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730> params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>>>, 'one_shot': True, 'only': None, 'arguments': None, 'calls': 1}
DEBUG:pika.channel:Adding passed-in RPC response callback
DEBUG:pika.callback:Added: {'callback': <bound method Channel._on_openok of <Channel number=1 OPENING conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730> params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>>>, 'one_shot': True, 'only': None, 'arguments': None, 'calls': 1}
DEBUG:pika.callback:Calling <bound method AMQPConnector._on_amqp_handshake_done of <pika.adapters.utils.connection_workflow.AMQPConnector object at 0x7fccbdfc67f0>> for "0:_on_connection_open_ok"
DEBUG:pika.adapters.utils.connection_workflow:AMQPConnector: AMQP handshake attempt completed; state=3; error=None; 'rabbitmq.service.us-west-2.consul'/(<AddressFamily.AF_INET: 2>, <SocketKind.SOCK_STREAM: 1>, 6, '', ('172.16.104.96', 5672))
DEBUG:pika.adapters.utils.connection_workflow:AMQPConnector: AMQP connection established for 'rabbitmq.service.us-west-2.consul'/(<AddressFamily.AF_INET: 2>, <SocketKind.SOCK_STREAM: 1>, 6, '', ('172.16.104.96', 5672)): <SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730> params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>
INFO:pika.adapters.utils.connection_workflow:AMQPConnector - reporting success: <SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730> params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>
DEBUG:pika.adapters.select_connection:remove_timeout: removing timeout <pika.adapters.select_connection._Timeout object at 0x7fccbdfc6670> with deadline=602671.892703394 and callback=<bound method AMQPConnector._on_overall_timeout of <pika.adapters.utils.connection_workflow.AMQPConnector object at 0x7fccbdfc67f0>>
DEBUG:pika.adapters.utils.connection_workflow:Connection attempt completed with <SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730> params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>
INFO:pika.adapters.utils.connection_workflow:AMQPConnectionWorkflow - reporting success: <SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730> params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>
DEBUG:pika.adapters.base_connection:Full-stack connection workflow completed: <SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730> params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>
DEBUG:pika.adapters.utils.io_services_utils:Recv would block on <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('172.17.0.11', 33440), raddr=('172.16.104.96', 5672)>
DEBUG:pika.adapters.utils.selector_ioloop_adapter:SelectorIOServicesAdapter.remove_writer(6)
DEBUG:pika.adapters.utils.selector_ioloop_adapter:remove_writer(6) updated handler Rd
DEBUG:pika.adapters.utils.io_services_utils:Turned off writability watcher: <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('172.17.0.11', 33440), raddr=('172.16.104.96', 5672)>
DEBUG:pika.callback:Processing 1:Channel.OpenOk
DEBUG:pika.callback:Processing use of oneshot callback
DEBUG:pika.callback:0 registered uses left
DEBUG:pika.callback:Removing callback #0: {'callback': <bound method Channel._on_synchronous_complete of <Channel number=1 OPENING conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730> params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>>>, 'one_shot': True, 'only': None, 'arguments': None, 'calls': 0}
DEBUG:pika.callback:Processing use of oneshot callback
DEBUG:pika.callback:0 registered uses left
DEBUG:pika.callback:Removing callback #0: {'callback': <bound method Channel._on_openok of <Channel number=1 OPENING conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730> params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>>>, 'one_shot': True, 'only': None, 'arguments': None, 'calls': 0}
DEBUG:pika.callback:Calling <bound method Channel._on_synchronous_complete of <Channel number=1 OPENING conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730> params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>>> for "1:Channel.OpenOk"
DEBUG:pika.channel:0 blocked frames
DEBUG:pika.callback:Calling <bound method Channel._on_openok of <Channel number=1 OPENING conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730> params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>>> for "1:Channel.OpenOk"
INFO:root:USERDEBUG ReconnectingConsumer._on_channel_open
DEBUG:root:Channel opened
DEBUG:root:Adding channel close callback
DEBUG:pika.callback:Added: {'callback': <bound method ReconnectingConsumer._on_channel_closed of <test.ReconnectingConsumer object at 0x7fccbe85b2e0>>, 'one_shot': False, 'only': <Channel number=1 OPEN conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730> params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>>, 'arguments': None}
DEBUG:root:Setting channel prefetch: 1
DEBUG:pika.adapters.utils.selector_ioloop_adapter:SelectorIOServicesAdapter.set_writer(6, <bound method _AsyncPlaintextTransport._on_socket_writable of <pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730>>)
DEBUG:pika.adapters.utils.selector_ioloop_adapter:set_writer(6, _) updated handler RdWr
DEBUG:pika.adapters.utils.io_services_utils:Turned on writability watcher: <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('172.17.0.11', 33440), raddr=('172.16.104.96', 5672)>
DEBUG:pika.channel:Entering blocking state on frame <Basic.Qos(['global_qos=False', 'prefetch_count=1', 'prefetch_size=0'])>; acceptable_replies=[<class 'pika.spec.Basic.QosOk'>]
DEBUG:pika.channel:Adding on_synchronous_complete callback
DEBUG:pika.callback:Added: {'callback': <bound method Channel._on_synchronous_complete of <Channel number=1 OPEN conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730> params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>>>, 'one_shot': True, 'only': None, 'arguments': None, 'calls': 1}
DEBUG:pika.channel:Adding passed-in RPC response callback
DEBUG:pika.callback:Added: {'callback': <bound method ReconnectingConsumer._on_basic_qos_ok of <test.ReconnectingConsumer object at 0x7fccbe85b2e0>>, 'one_shot': True, 'only': None, 'arguments': None, 'calls': 1}
DEBUG:pika.adapters.utils.io_services_utils:Recv would block on <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('172.17.0.11', 33440), raddr=('172.16.104.96', 5672)>
DEBUG:pika.adapters.utils.selector_ioloop_adapter:SelectorIOServicesAdapter.remove_writer(6)
DEBUG:pika.adapters.utils.selector_ioloop_adapter:remove_writer(6) updated handler Rd
DEBUG:pika.adapters.utils.io_services_utils:Turned off writability watcher: <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('172.17.0.11', 33440), raddr=('172.16.104.96', 5672)>
DEBUG:pika.callback:Processing 1:Basic.QosOk
DEBUG:pika.callback:Processing use of oneshot callback
DEBUG:pika.callback:0 registered uses left
DEBUG:pika.callback:Removing callback #0: {'callback': <bound method Channel._on_synchronous_complete of <Channel number=1 OPEN conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730> params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>>>, 'one_shot': True, 'only': None, 'arguments': None, 'calls': 0}
DEBUG:pika.callback:Processing use of oneshot callback
DEBUG:pika.callback:0 registered uses left
DEBUG:pika.callback:Removing callback #0: {'callback': <bound method ReconnectingConsumer._on_basic_qos_ok of <test.ReconnectingConsumer object at 0x7fccbe85b2e0>>, 'one_shot': True, 'only': None, 'arguments': None, 'calls': 0}
DEBUG:pika.callback:Calling <bound method Channel._on_synchronous_complete of <Channel number=1 OPEN conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730> params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>>> for "1:Basic.QosOk"
DEBUG:pika.channel:0 blocked frames
DEBUG:pika.callback:Calling <bound method ReconnectingConsumer._on_basic_qos_ok of <test.ReconnectingConsumer object at 0x7fccbe85b2e0>> for "1:Basic.QosOk"
DEBUG:root:QOS set to: 1
INFO:root:USERDEBUG ReconnectingConsumer._setup_exchanges
DEBUG:root:Setting up configured exchanges
DEBUG:root:Declaring exchange: platform
DEBUG:pika.adapters.utils.selector_ioloop_adapter:SelectorIOServicesAdapter.set_writer(6, <bound method _AsyncPlaintextTransport._on_socket_writable of <pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730>>)
DEBUG:pika.adapters.utils.selector_ioloop_adapter:set_writer(6, _) updated handler RdWr
DEBUG:pika.adapters.utils.io_services_utils:Turned on writability watcher: <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('172.17.0.11', 33440), raddr=('172.16.104.96', 5672)>
DEBUG:pika.channel:Entering blocking state on frame <Exchange.Declare(['arguments={}', 'auto_delete=False', 'durable=False', 'exchange=platform', 'internal=False', 'nowait=False', 'passive=False', 'ticket=0', 'type=direct'])>; acceptable_replies=[<class 'pika.spec.Exchange.DeclareOk'>]
DEBUG:pika.channel:Adding on_synchronous_complete callback
DEBUG:pika.callback:Added: {'callback': <bound method Channel._on_synchronous_complete of <Channel number=1 OPEN conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730> params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>>>, 'one_shot': True, 'only': None, 'arguments': None, 'calls': 1}
DEBUG:pika.channel:Adding passed-in RPC response callback
DEBUG:pika.callback:Added: {'callback': functools.partial(<bound method ReconnectingConsumer._on_exchange_declare_ok of <test.ReconnectingConsumer object at 0x7fccbe85b2e0>>, userdata=([{'name': 'platform_jobs', 'bindings': [{'routing_key': 'platform.orchestration.jobs'}]}], 'platform')), 'one_shot': True, 'only': None, 'arguments': None, 'calls': 1}
DEBUG:pika.adapters.utils.io_services_utils:Recv would block on <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('172.17.0.11', 33440), raddr=('172.16.104.96', 5672)>
DEBUG:pika.adapters.utils.selector_ioloop_adapter:SelectorIOServicesAdapter.remove_writer(6)
DEBUG:pika.adapters.utils.selector_ioloop_adapter:remove_writer(6) updated handler Rd
DEBUG:pika.adapters.utils.io_services_utils:Turned off writability watcher: <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('172.17.0.11', 33440), raddr=('172.16.104.96', 5672)>
DEBUG:pika.callback:Processing 1:Exchange.DeclareOk
DEBUG:pika.callback:Processing use of oneshot callback
DEBUG:pika.callback:0 registered uses left
DEBUG:pika.callback:Removing callback #0: {'callback': <bound method Channel._on_synchronous_complete of <Channel number=1 OPEN conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730> params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>>>, 'one_shot': True, 'only': None, 'arguments': None, 'calls': 0}
DEBUG:pika.callback:Processing use of oneshot callback
DEBUG:pika.callback:0 registered uses left
DEBUG:pika.callback:Removing callback #0: {'callback': functools.partial(<bound method ReconnectingConsumer._on_exchange_declare_ok of <test.ReconnectingConsumer object at 0x7fccbe85b2e0>>, userdata=([{'name': 'platform_jobs', 'bindings': [{'routing_key': 'platform.orchestration.jobs'}]}], 'platform')), 'one_shot': True, 'only': None, 'arguments': None, 'calls': 0}
DEBUG:pika.callback:Calling <bound method Channel._on_synchronous_complete of <Channel number=1 OPEN conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730> params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>>> for "1:Exchange.DeclareOk"
DEBUG:pika.channel:0 blocked frames
DEBUG:pika.callback:Calling functools.partial(<bound method ReconnectingConsumer._on_exchange_declare_ok of <test.ReconnectingConsumer object at 0x7fccbe85b2e0>>, userdata=([{'name': 'platform_jobs', 'bindings': [{'routing_key': 'platform.orchestration.jobs'}]}], 'platform')) for "1:Exchange.DeclareOk"
INFO:root:USERDEBUG ReconnectingConsumer._on_exchange_declare_ok
DEBUG:root:Exchange declared: platform
INFO:root:USERDEBUG ReconnectingConsumer._setup_queues
DEBUG:root:Setting up configured queues for exchange platform
DEBUG:root:Declaring queue: platform_jobs
DEBUG:pika.adapters.utils.selector_ioloop_adapter:SelectorIOServicesAdapter.set_writer(6, <bound method _AsyncPlaintextTransport._on_socket_writable of <pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730>>)
DEBUG:pika.adapters.utils.selector_ioloop_adapter:set_writer(6, _) updated handler RdWr
DEBUG:pika.adapters.utils.io_services_utils:Turned on writability watcher: <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('172.17.0.11', 33440), raddr=('172.16.104.96', 5672)>
DEBUG:pika.channel:Entering blocking state on frame <Queue.Declare(['arguments={}', 'auto_delete=False', 'durable=False', 'exclusive=False', 'nowait=False', 'passive=False', 'queue=platform_jobs', 'ticket=0'])>; acceptable_replies=[(<class 'pika.spec.Queue.DeclareOk'>, {'queue': 'platform_jobs'})]
DEBUG:pika.channel:Adding on_synchronous_complete callback
DEBUG:pika.callback:Added: {'callback': <bound method Channel._on_synchronous_complete of <Channel number=1 OPEN conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730> params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>>>, 'one_shot': True, 'only': None, 'arguments': {'queue': 'platform_jobs'}, 'calls': 1}
DEBUG:pika.channel:Adding passed-in RPC response callback
DEBUG:pika.callback:Added: {'callback': functools.partial(<bound method ReconnectingConsumer._on_queue_declare_ok of <test.ReconnectingConsumer object at 0x7fccbe85b2e0>>, userdata=({'name': 'platform_jobs', 'bindings': [{'routing_key': 'platform.orchestration.jobs'}]}, 'platform')), 'one_shot': True, 'only': None, 'arguments': {'queue': 'platform_jobs'}, 'calls': 1}
DEBUG:pika.adapters.utils.io_services_utils:Recv would block on <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('172.17.0.11', 33440), raddr=('172.16.104.96', 5672)>
DEBUG:pika.adapters.utils.selector_ioloop_adapter:SelectorIOServicesAdapter.remove_writer(6)
DEBUG:pika.adapters.utils.selector_ioloop_adapter:remove_writer(6) updated handler Rd
DEBUG:pika.adapters.utils.io_services_utils:Turned off writability watcher: <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('172.17.0.11', 33440), raddr=('172.16.104.96', 5672)>
DEBUG:pika.callback:Processing 1:Queue.DeclareOk
DEBUG:pika.callback:Processing use of oneshot callback
DEBUG:pika.callback:0 registered uses left
DEBUG:pika.callback:Comparing {'queue': 'platform_jobs'} to {'queue': 'platform_jobs'}
DEBUG:pika.callback:Removing callback #0: {'callback': <bound method Channel._on_synchronous_complete of <Channel number=1 OPEN conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730> params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>>>, 'one_shot': True, 'only': None, 'arguments': {'queue': 'platform_jobs'}, 'calls': 0}
DEBUG:pika.callback:Processing use of oneshot callback
DEBUG:pika.callback:0 registered uses left
DEBUG:pika.callback:Comparing {'queue': 'platform_jobs'} to {'queue': 'platform_jobs'}
DEBUG:pika.callback:Removing callback #0: {'callback': functools.partial(<bound method ReconnectingConsumer._on_queue_declare_ok of <test.ReconnectingConsumer object at 0x7fccbe85b2e0>>, userdata=({'name': 'platform_jobs', 'bindings': [{'routing_key': 'platform.orchestration.jobs'}]}, 'platform')), 'one_shot': True, 'only': None, 'arguments': {'queue': 'platform_jobs'}, 'calls': 0}
DEBUG:pika.callback:Calling <bound method Channel._on_synchronous_complete of <Channel number=1 OPEN conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730> params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>>> for "1:Queue.DeclareOk"
DEBUG:pika.channel:0 blocked frames
DEBUG:pika.callback:Calling functools.partial(<bound method ReconnectingConsumer._on_queue_declare_ok of <test.ReconnectingConsumer object at 0x7fccbe85b2e0>>, userdata=({'name': 'platform_jobs', 'bindings': [{'routing_key': 'platform.orchestration.jobs'}]}, 'platform')) for "1:Queue.DeclareOk"
INFO:root:USERDEBUG ReconnectingConsumer._on_queue_declare_ok
DEBUG:root:Queue declared: platform_jobs
INFO:root:USERDEBUG ReconnectingConsumer._setup_bindings
DEBUG:root:Setting up bindings for queue platform_jobs
DEBUG:root:Declaring binding for queue: platform_jobs ([{'routing_key': 'platform.orchestration.jobs'}])
DEBUG:pika.adapters.utils.selector_ioloop_adapter:SelectorIOServicesAdapter.set_writer(6, <bound method _AsyncPlaintextTransport._on_socket_writable of <pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730>>)
DEBUG:pika.adapters.utils.selector_ioloop_adapter:set_writer(6, _) updated handler RdWr
DEBUG:pika.adapters.utils.io_services_utils:Turned on writability watcher: <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('172.17.0.11', 33440), raddr=('172.16.104.96', 5672)>
DEBUG:pika.channel:Entering blocking state on frame <Queue.Bind(['arguments={}', 'exchange=platform', 'nowait=False', 'queue=platform_jobs', 'routing_key=platform.orchestration.jobs', 'ticket=0'])>; acceptable_replies=[<class 'pika.spec.Queue.BindOk'>]
DEBUG:pika.channel:Adding on_synchronous_complete callback
DEBUG:pika.callback:Added: {'callback': <bound method Channel._on_synchronous_complete of <Channel number=1 OPEN conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730> params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>>>, 'one_shot': True, 'only': None, 'arguments': None, 'calls': 1}
DEBUG:pika.channel:Adding passed-in RPC response callback
DEBUG:pika.callback:Added: {'callback': functools.partial(<bound method ReconnectingConsumer._on_bind_ok of <test.ReconnectingConsumer object at 0x7fccbe85b2e0>>, userdata=({'routing_key': 'platform.orchestration.jobs'}, 'platform_jobs')), 'one_shot': True, 'only': None, 'arguments': None, 'calls': 1}
DEBUG:pika.adapters.utils.io_services_utils:Recv would block on <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('172.17.0.11', 33440), raddr=('172.16.104.96', 5672)>
DEBUG:pika.adapters.utils.selector_ioloop_adapter:SelectorIOServicesAdapter.remove_writer(6)
DEBUG:pika.adapters.utils.selector_ioloop_adapter:remove_writer(6) updated handler Rd
DEBUG:pika.adapters.utils.io_services_utils:Turned off writability watcher: <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('172.17.0.11', 33440), raddr=('172.16.104.96', 5672)>
DEBUG:pika.callback:Processing 1:Queue.BindOk
DEBUG:pika.callback:Processing use of oneshot callback
DEBUG:pika.callback:0 registered uses left
DEBUG:pika.callback:Removing callback #0: {'callback': <bound method Channel._on_synchronous_complete of <Channel number=1 OPEN conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730> params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>>>, 'one_shot': True, 'only': None, 'arguments': None, 'calls': 0}
DEBUG:pika.callback:Processing use of oneshot callback
DEBUG:pika.callback:0 registered uses left
DEBUG:pika.callback:Removing callback #0: {'callback': functools.partial(<bound method ReconnectingConsumer._on_bind_ok of <test.ReconnectingConsumer object at 0x7fccbe85b2e0>>, userdata=({'routing_key': 'platform.orchestration.jobs'}, 'platform_jobs')), 'one_shot': True, 'only': None, 'arguments': None, 'calls': 0}
DEBUG:pika.callback:Calling <bound method Channel._on_synchronous_complete of <Channel number=1 OPEN conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730> params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>>> for "1:Queue.BindOk"
DEBUG:pika.channel:0 blocked frames
DEBUG:pika.callback:Calling functools.partial(<bound method ReconnectingConsumer._on_bind_ok of <test.ReconnectingConsumer object at 0x7fccbe85b2e0>>, userdata=({'routing_key': 'platform.orchestration.jobs'}, 'platform_jobs')) for "1:Queue.BindOk"
INFO:root:USERDEBUG ReconnectingConsumer._on_bind_ok
DEBUG:root:Binding ok for queue: platform_jobs ({'routing_key': 'platform.orchestration.jobs'})
INFO:root:USERDEBUG ReconnectingConsumer._start_consuming
DEBUG:root:Issuing consumer related RPC commands
DEBUG:pika.adapters.utils.selector_ioloop_adapter:SelectorIOServicesAdapter.set_writer(6, <bound method _AsyncPlaintextTransport._on_socket_writable of <pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730>>)
DEBUG:pika.adapters.utils.selector_ioloop_adapter:set_writer(6, _) updated handler RdWr
DEBUG:pika.adapters.utils.io_services_utils:Turned on writability watcher: <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('172.17.0.11', 33440), raddr=('172.16.104.96', 5672)>
DEBUG:pika.channel:Entering blocking state on frame <Basic.Consume(['arguments={}', 'consumer_tag=ctag1.356a4fe6f7a9465eb8a29c58e34fa1e9', 'exclusive=False', 'no_ack=False', 'no_local=False', 'nowait=False', 'queue=platform_jobs', 'ticket=0'])>; acceptable_replies=[(<class 'pika.spec.Basic.ConsumeOk'>, {'consumer_tag': 'ctag1.356a4fe6f7a9465eb8a29c58e34fa1e9'})]
DEBUG:pika.channel:Adding on_synchronous_complete callback
DEBUG:pika.callback:Added: {'callback': <bound method Channel._on_synchronous_complete of <Channel number=1 OPEN conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730> params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>>>, 'one_shot': True, 'only': None, 'arguments': {'consumer_tag': 'ctag1.356a4fe6f7a9465eb8a29c58e34fa1e9'}, 'calls': 1}
DEBUG:pika.channel:Adding passed-in RPC response callback
DEBUG:pika.callback:Added: {'callback': <bound method Channel._on_eventok of <Channel number=1 OPEN conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730> params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>>>, 'one_shot': True, 'only': None, 'arguments': {'consumer_tag': 'ctag1.356a4fe6f7a9465eb8a29c58e34fa1e9'}, 'calls': 1}
DEBUG:root:Adding consumer cancellation callback
DEBUG:pika.callback:Added: {'callback': <bound method ReconnectingConsumer._on_consumer_cancelled of <test.ReconnectingConsumer object at 0x7fccbe85b2e0>>, 'one_shot': False, 'only': None, 'arguments': None}
DEBUG:pika.adapters.utils.io_services_utils:Recv would block on <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('172.17.0.11', 33440), raddr=('172.16.104.96', 5672)>
DEBUG:pika.adapters.utils.selector_ioloop_adapter:SelectorIOServicesAdapter.remove_writer(6)
DEBUG:pika.adapters.utils.selector_ioloop_adapter:remove_writer(6) updated handler Rd
DEBUG:pika.adapters.utils.io_services_utils:Turned off writability watcher: <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('172.17.0.11', 33440), raddr=('172.16.104.96', 5672)>
DEBUG:pika.callback:Processing 1:Basic.ConsumeOk
DEBUG:pika.callback:Processing use of oneshot callback
DEBUG:pika.callback:0 registered uses left
DEBUG:pika.callback:Comparing {'consumer_tag': 'ctag1.356a4fe6f7a9465eb8a29c58e34fa1e9'} to {'consumer_tag': 'ctag1.356a4fe6f7a9465eb8a29c58e34fa1e9'}
DEBUG:pika.callback:Removing callback #0: {'callback': <bound method Channel._on_synchronous_complete of <Channel number=1 OPEN conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730> params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>>>, 'one_shot': True, 'only': None, 'arguments': {'consumer_tag': 'ctag1.356a4fe6f7a9465eb8a29c58e34fa1e9'}, 'calls': 0}
DEBUG:pika.callback:Processing use of oneshot callback
DEBUG:pika.callback:0 registered uses left
DEBUG:pika.callback:Comparing {'consumer_tag': 'ctag1.356a4fe6f7a9465eb8a29c58e34fa1e9'} to {'consumer_tag': 'ctag1.356a4fe6f7a9465eb8a29c58e34fa1e9'}
DEBUG:pika.callback:Removing callback #0: {'callback': <bound method Channel._on_eventok of <Channel number=1 OPEN conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730> params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>>>, 'one_shot': True, 'only': None, 'arguments': {'consumer_tag': 'ctag1.356a4fe6f7a9465eb8a29c58e34fa1e9'}, 'calls': 0}
DEBUG:pika.callback:Calling <bound method Channel._on_synchronous_complete of <Channel number=1 OPEN conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730> params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>>> for "1:Basic.ConsumeOk"
DEBUG:pika.channel:0 blocked frames
DEBUG:pika.callback:Calling <bound method Channel._on_eventok of <Channel number=1 OPEN conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730> params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>>> for "1:Basic.ConsumeOk"
DEBUG:pika.channel:Discarding frame <METHOD(['channel_number=1', 'frame_type=1', "method=<Basic.ConsumeOk(['consumer_tag=ctag1.356a4fe6f7a9465eb8a29c58e34fa1e9'])>"])>
DEBUG:pika.adapters.utils.io_services_utils:Recv would block on <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('172.17.0.11', 33440), raddr=('172.16.104.96', 5672)>
DEBUG:pika.heartbeat:Received heartbeat frame
DEBUG:pika.adapters.utils.io_services_utils:Recv would block on <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('172.17.0.11', 33440), raddr=('172.16.104.96', 5672)>
DEBUG:pika.heartbeat:Sending heartbeat frame
DEBUG:pika.heartbeat:Sending heartbeat frame
DEBUG:pika.adapters.utils.selector_ioloop_adapter:SelectorIOServicesAdapter.set_writer(6, <bound method _AsyncPlaintextTransport._on_socket_writable of <pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730>>)
DEBUG:pika.adapters.utils.selector_ioloop_adapter:set_writer(6, _) updated handler RdWr
DEBUG:pika.adapters.utils.io_services_utils:Turned on writability watcher: <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('172.17.0.11', 33440), raddr=('172.16.104.96', 5672)>
DEBUG:pika.adapters.select_connection:call_later: added timeout <pika.adapters.select_connection._Timeout object at 0x7fccbdfc6670> with deadline=602716.904673617 and callback=<bound method HeartbeatChecker._send_heartbeat of <pika.heartbeat.HeartbeatChecker object at 0x7fccbdfc6550>>; now=602686.904673617; delay=30.0
DEBUG:pika.adapters.utils.selector_ioloop_adapter:SelectorIOServicesAdapter.remove_writer(6)
DEBUG:pika.adapters.utils.selector_ioloop_adapter:remove_writer(6) updated handler Rd
DEBUG:pika.adapters.utils.io_services_utils:Turned off writability watcher: <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('172.17.0.11', 33440), raddr=('172.16.104.96', 5672)>
INFO:root:USERDEBUG ReconnectingConsumer._on_message
DEBUG:root:Received message for queue platform_jobs with body test4
{'queue': 'platform_jobs', 'body': 'test4'}
DEBUG:pika.adapters.utils.selector_ioloop_adapter:SelectorIOServicesAdapter.set_writer(6, <bound method _AsyncPlaintextTransport._on_socket_writable of <pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730>>)
DEBUG:pika.adapters.utils.selector_ioloop_adapter:set_writer(6, _) updated handler RdWr
DEBUG:pika.adapters.utils.io_services_utils:Turned on writability watcher: <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('172.17.0.11', 33440), raddr=('172.16.104.96', 5672)>
DEBUG:pika.adapters.utils.io_services_utils:Recv would block on <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('172.17.0.11', 33440), raddr=('172.16.104.96', 5672)>
DEBUG:pika.adapters.utils.selector_ioloop_adapter:SelectorIOServicesAdapter.remove_writer(6)
DEBUG:pika.adapters.utils.selector_ioloop_adapter:remove_writer(6) updated handler Rd
DEBUG:pika.adapters.utils.io_services_utils:Turned off writability watcher: <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('172.17.0.11', 33440), raddr=('172.16.104.96', 5672)>
DEBUG:pika.heartbeat:Sending heartbeat frame
DEBUG:pika.heartbeat:Sending heartbeat frame
DEBUG:pika.adapters.utils.selector_ioloop_adapter:SelectorIOServicesAdapter.set_writer(6, <bound method _AsyncPlaintextTransport._on_socket_writable of <pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730>>)
DEBUG:pika.adapters.utils.selector_ioloop_adapter:set_writer(6, _) updated handler RdWr
DEBUG:pika.adapters.utils.io_services_utils:Turned on writability watcher: <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('172.17.0.11', 33440), raddr=('172.16.104.96', 5672)>
DEBUG:pika.adapters.select_connection:call_later: added timeout <pika.adapters.select_connection._Timeout object at 0x7fccbdfc6b20> with deadline=602746.906641168 and callback=<bound method HeartbeatChecker._send_heartbeat of <pika.heartbeat.HeartbeatChecker object at 0x7fccbdfc6550>>; now=602716.906641168; delay=30.0
DEBUG:pika.adapters.utils.selector_ioloop_adapter:SelectorIOServicesAdapter.remove_writer(6)
DEBUG:pika.adapters.utils.selector_ioloop_adapter:remove_writer(6) updated handler Rd
DEBUG:pika.adapters.utils.io_services_utils:Turned off writability watcher: <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('172.17.0.11', 33440), raddr=('172.16.104.96', 5672)>
DEBUG:pika.heartbeat:Received 1 heartbeat frames, sent 2, idle intervals 0
DEBUG:pika.adapters.select_connection:call_later: added timeout <pika.adapters.select_connection._Timeout object at 0x7fccbdfc6580> with deadline=602786.90237689 and callback=<bound method HeartbeatChecker._check_heartbeat of <pika.heartbeat.HeartbeatChecker object at 0x7fccbdfc6550>>; now=602721.90237689; delay=65
>>>
>>> sensor.stop()
INFO:root:USERDEBUG ReconnectingConsumer.stop
INFO:root:Stopping
INFO:root:USERDEBUG ReconnectingConsumer._stop_consuming
DEBUG:root:Sending a Basic.Cancel RPC command to RabbitMQ for consumer ctag1.356a4fe6f7a9465eb8a29c58e34fa1e9
DEBUG:pika.channel:Cancelling consumer: ctag1.356a4fe6f7a9465eb8a29c58e34fa1e9 (nowait=False)
DEBUG:pika.callback:Added: {'callback': functools.partial(<bound method ReconnectingConsumer._on_consumer_cancelled_ok of <test.ReconnectingConsumer object at 0x7fccbe85b2e0>>, userdata='ctag1.356a4fe6f7a9465eb8a29c58e34fa1e9'), 'one_shot': True, 'only': None, 'arguments': None, 'calls': 1}
DEBUG:pika.adapters.utils.selector_ioloop_adapter:SelectorIOServicesAdapter.set_writer(6, <bound method _AsyncPlaintextTransport._on_socket_writable of <pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730>>)
DEBUG:pika.adapters.utils.selector_ioloop_adapter:set_writer(6, _) updated handler RdWr
DEBUG:pika.adapters.utils.io_services_utils:Turned on writability watcher: <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('172.17.0.11', 33440), raddr=('172.16.104.96', 5672)>
DEBUG:pika.channel:Entering blocking state on frame <Basic.Cancel(['consumer_tag=ctag1.356a4fe6f7a9465eb8a29c58e34fa1e9', 'nowait=False'])>; acceptable_replies=[(<class 'pika.spec.Basic.CancelOk'>, {'consumer_tag': 'ctag1.356a4fe6f7a9465eb8a29c58e34fa1e9'})]
DEBUG:pika.channel:Adding on_synchronous_complete callback
DEBUG:pika.callback:Added: {'callback': <bound method Channel._on_synchronous_complete of <Channel number=1 OPEN conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730> params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>>>, 'one_shot': True, 'only': None, 'arguments': {'consumer_tag': 'ctag1.356a4fe6f7a9465eb8a29c58e34fa1e9'}, 'calls': 1}
DEBUG:pika.channel:Adding passed-in RPC response callback
DEBUG:pika.callback:Added: {'callback': <bound method Channel._on_cancelok of <Channel number=1 OPEN conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730> params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>>>, 'one_shot': True, 'only': None, 'arguments': {'consumer_tag': 'ctag1.356a4fe6f7a9465eb8a29c58e34fa1e9'}, 'calls': 1}
DEBUG:pika.adapters.select_connection:Entering IOLoop
DEBUG:pika.adapters.utils.selector_ioloop_adapter:SelectorIOServicesAdapter.remove_writer(6)
DEBUG:pika.adapters.utils.selector_ioloop_adapter:remove_writer(6) updated handler Rd
DEBUG:pika.adapters.utils.io_services_utils:Turned off writability watcher: <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('172.17.0.11', 33440), raddr=('172.16.104.96', 5672)>
DEBUG:pika.heartbeat:Received heartbeat frame
DEBUG:pika.heartbeat:Received heartbeat frame
DEBUG:pika.adapters.utils.io_services_utils:Recv would block on <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('172.17.0.11', 33440), raddr=('172.16.104.96', 5672)>
DEBUG:pika.heartbeat:Sending heartbeat frame
DEBUG:pika.heartbeat:Sending heartbeat frame
DEBUG:pika.adapters.utils.selector_ioloop_adapter:SelectorIOServicesAdapter.set_writer(6, <bound method _AsyncPlaintextTransport._on_socket_writable of <pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730>>)
DEBUG:pika.adapters.utils.selector_ioloop_adapter:set_writer(6, _) updated handler RdWr
DEBUG:pika.adapters.utils.io_services_utils:Turned on writability watcher: <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('172.17.0.11', 33440), raddr=('172.16.104.96', 5672)>
DEBUG:pika.adapters.select_connection:call_later: added timeout <pika.adapters.select_connection._Timeout object at 0x7fccbdfb98e0> with deadline=602867.965903572 and callback=<bound method HeartbeatChecker._send_heartbeat of <pika.heartbeat.HeartbeatChecker object at 0x7fccbdfc6550>>; now=602837.965903572; delay=30.0
DEBUG:pika.adapters.utils.selector_ioloop_adapter:SelectorIOServicesAdapter.remove_writer(6)
DEBUG:pika.adapters.utils.selector_ioloop_adapter:remove_writer(6) updated handler Rd
DEBUG:pika.adapters.utils.io_services_utils:Turned off writability watcher: <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('172.17.0.11', 33440), raddr=('172.16.104.96', 5672)>
DEBUG:pika.callback:Processing 1:Basic.CancelOk
DEBUG:pika.callback:Processing use of oneshot callback
DEBUG:pika.callback:0 registered uses left
DEBUG:pika.callback:Removing callback #0: {'callback': functools.partial(<bound method ReconnectingConsumer._on_consumer_cancelled_ok of <test.ReconnectingConsumer object at 0x7fccbe85b2e0>>, userdata='ctag1.356a4fe6f7a9465eb8a29c58e34fa1e9'), 'one_shot': True, 'only': None, 'arguments': None, 'calls': 0}
DEBUG:pika.callback:Processing use of oneshot callback
DEBUG:pika.callback:0 registered uses left
DEBUG:pika.callback:Comparing {'consumer_tag': 'ctag1.356a4fe6f7a9465eb8a29c58e34fa1e9'} to {'consumer_tag': 'ctag1.356a4fe6f7a9465eb8a29c58e34fa1e9'}
DEBUG:pika.callback:Removing callback #0: {'callback': <bound method Channel._on_synchronous_complete of <Channel number=1 OPEN conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730> params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>>>, 'one_shot': True, 'only': None, 'arguments': {'consumer_tag': 'ctag1.356a4fe6f7a9465eb8a29c58e34fa1e9'}, 'calls': 0}
DEBUG:pika.callback:Processing use of oneshot callback
DEBUG:pika.callback:0 registered uses left
DEBUG:pika.callback:Comparing {'consumer_tag': 'ctag1.356a4fe6f7a9465eb8a29c58e34fa1e9'} to {'consumer_tag': 'ctag1.356a4fe6f7a9465eb8a29c58e34fa1e9'}
DEBUG:pika.callback:Removing callback #0: {'callback': <bound method Channel._on_cancelok of <Channel number=1 OPEN conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730> params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>>>, 'one_shot': True, 'only': None, 'arguments': {'consumer_tag': 'ctag1.356a4fe6f7a9465eb8a29c58e34fa1e9'}, 'calls': 0}
DEBUG:pika.callback:Calling functools.partial(<bound method ReconnectingConsumer._on_consumer_cancelled_ok of <test.ReconnectingConsumer object at 0x7fccbe85b2e0>>, userdata='ctag1.356a4fe6f7a9465eb8a29c58e34fa1e9') for "1:Basic.CancelOk"
DEBUG:root:RabbitMQ acknowledged the cancellation of the consumer: ctag1.356a4fe6f7a9465eb8a29c58e34fa1e9
INFO:root:USERDEBUG ReconnectingConsumer._close_channel
DEBUG:root:Closing channel
INFO:pika.channel:Closing channel (0): 'Normal shutdown' on <Channel number=1 OPEN conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730> params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>>
DEBUG:pika.channel:Already in blocking state, so enqueueing method <Channel.Close(['class_id=0', 'method_id=0', 'reply_code=0', 'reply_text=Normal shutdown'])>; acceptable_replies=[<class 'pika.spec.Channel.CloseOk'>]
DEBUG:pika.callback:Calling <bound method Channel._on_synchronous_complete of <Channel number=1 CLOSING conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730> params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>>> for "1:Basic.CancelOk"
DEBUG:pika.channel:1 blocked frames
DEBUG:pika.adapters.utils.selector_ioloop_adapter:SelectorIOServicesAdapter.set_writer(6, <bound method _AsyncPlaintextTransport._on_socket_writable of <pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730>>)
DEBUG:pika.adapters.utils.selector_ioloop_adapter:set_writer(6, _) updated handler RdWr
DEBUG:pika.adapters.utils.io_services_utils:Turned on writability watcher: <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('172.17.0.11', 33440), raddr=('172.16.104.96', 5672)>
DEBUG:pika.channel:Entering blocking state on frame <Channel.Close(['class_id=0', 'method_id=0', 'reply_code=0', 'reply_text=Normal shutdown'])>; acceptable_replies=[<class 'pika.spec.Channel.CloseOk'>]
DEBUG:pika.channel:Adding on_synchronous_complete callback
DEBUG:pika.callback:Added: {'callback': <bound method Channel._on_synchronous_complete of <Channel number=1 CLOSING conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730> params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>>>, 'one_shot': True, 'only': None, 'arguments': None, 'calls': 1}
DEBUG:pika.channel:Adding passed-in RPC response callback
DEBUG:pika.callback:Added: {'callback': <bound method Channel._on_closeok of <Channel number=1 CLOSING conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730> params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>>>, 'one_shot': True, 'only': None, 'arguments': None, 'calls': 1}
DEBUG:pika.callback:Calling <bound method Channel._on_cancelok of <Channel number=1 CLOSING conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730> params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>>> for "1:Basic.CancelOk"
DEBUG:pika.adapters.utils.io_services_utils:Recv would block on <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('172.17.0.11', 33440), raddr=('172.16.104.96', 5672)>
DEBUG:pika.adapters.utils.selector_ioloop_adapter:SelectorIOServicesAdapter.remove_writer(6)
DEBUG:pika.adapters.utils.selector_ioloop_adapter:remove_writer(6) updated handler Rd
DEBUG:pika.adapters.utils.io_services_utils:Turned off writability watcher: <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('172.17.0.11', 33440), raddr=('172.16.104.96', 5672)>
DEBUG:pika.callback:Processing 1:Channel.CloseOk
DEBUG:pika.callback:Processing use of oneshot callback
DEBUG:pika.callback:0 registered uses left
DEBUG:pika.callback:Removing callback #0: {'callback': <bound method Channel._on_synchronous_complete of <Channel number=1 CLOSING conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730> params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>>>, 'one_shot': True, 'only': None, 'arguments': None, 'calls': 0}
DEBUG:pika.callback:Processing use of oneshot callback
DEBUG:pika.callback:0 registered uses left
DEBUG:pika.callback:Removing callback #0: {'callback': <bound method Channel._on_closeok of <Channel number=1 CLOSING conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730> params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>>>, 'one_shot': True, 'only': None, 'arguments': None, 'calls': 0}
DEBUG:pika.callback:Calling <bound method Channel._on_synchronous_complete of <Channel number=1 CLOSING conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730> params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>>> for "1:Channel.CloseOk"
DEBUG:pika.channel:0 blocked frames
DEBUG:pika.callback:Calling <bound method Channel._on_closeok of <Channel number=1 CLOSING conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730> params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>>> for "1:Channel.CloseOk"
INFO:pika.channel:Received <Channel.CloseOk> on <Channel number=1 CLOSING conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730> params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>>
DEBUG:pika.callback:Processing 1:_on_channel_close
DEBUG:pika.callback:Calling <bound method ReconnectingConsumer._on_channel_closed of <test.ReconnectingConsumer object at 0x7fccbe85b2e0>> for "1:_on_channel_close"
WARNING:root:Channel 1 was closed: (0, 'Normal shutdown')
INFO:root:USERDEBUG ReconnectingConsumer._close_connection
DEBUG:root:Closing connection
DEBUG:pika.connection:New Connection state: CLOSING (prev=OPEN)
INFO:pika.connection:Closing connection (200): 'Normal shutdown'
INFO:pika.connection:Connection.close is waiting for 1 channels to close: <SelectConnection CLOSING transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730> params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>
DEBUG:pika.callback:Processing 1:_on_channel_cleanup
DEBUG:pika.callback:Processing use of oneshot callback
DEBUG:pika.callback:0 registered uses left
DEBUG:pika.callback:Removing callback #0: {'callback': <bound method Connection._on_channel_cleanup of <SelectConnection CLOSING transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730> params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>>, 'one_shot': True, 'only': <Channel number=1 CLOSED conn=<SelectConnection CLOSING transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730> params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>>, 'arguments': None, 'calls': 0}
DEBUG:pika.callback:Calling <bound method Connection._on_channel_cleanup of <SelectConnection CLOSING transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730> params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>> for "1:_on_channel_cleanup"
DEBUG:pika.connection:Removed channel 1
DEBUG:pika.callback:Added: {'callback': <bound method Connection._on_connection_close_ok of <SelectConnection CLOSING transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730> params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>>, 'one_shot': True, 'only': None, 'arguments': None, 'calls': 1}
DEBUG:pika.adapters.utils.selector_ioloop_adapter:SelectorIOServicesAdapter.set_writer(6, <bound method _AsyncPlaintextTransport._on_socket_writable of <pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730>>)
DEBUG:pika.adapters.utils.selector_ioloop_adapter:set_writer(6, _) updated handler RdWr
DEBUG:pika.adapters.utils.io_services_utils:Turned on writability watcher: <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('172.17.0.11', 33440), raddr=('172.16.104.96', 5672)>
DEBUG:pika.callback:Clearing out '1' from the stack
DEBUG:pika.adapters.utils.io_services_utils:Recv would block on <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('172.17.0.11', 33440), raddr=('172.16.104.96', 5672)>
DEBUG:pika.adapters.utils.selector_ioloop_adapter:SelectorIOServicesAdapter.remove_writer(6)
DEBUG:pika.adapters.utils.selector_ioloop_adapter:remove_writer(6) updated handler Rd
DEBUG:pika.adapters.utils.io_services_utils:Turned off writability watcher: <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('172.17.0.11', 33440), raddr=('172.16.104.96', 5672)>
DEBUG:pika.callback:Processing 0:Connection.CloseOk
DEBUG:pika.callback:Processing use of oneshot callback
DEBUG:pika.callback:0 registered uses left
DEBUG:pika.callback:Removing callback #0: {'callback': <bound method Connection._on_connection_close_ok of <SelectConnection CLOSING transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730> params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>>, 'one_shot': True, 'only': None, 'arguments': None, 'calls': 0}
DEBUG:pika.callback:Calling <bound method Connection._on_connection_close_ok of <SelectConnection CLOSING transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730> params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>> for "0:Connection.CloseOk"
DEBUG:pika.connection:_on_connection_close_ok: frame=<METHOD(['channel_number=0', 'frame_type=1', 'method=<Connection.CloseOk>'])>
DEBUG:pika.heartbeat:Removing timer for next heartbeat send interval
DEBUG:pika.adapters.select_connection:remove_timeout: removing timeout <pika.adapters.select_connection._Timeout object at 0x7fccbdfb98e0> with deadline=602867.965903572 and callback=<bound method HeartbeatChecker._send_heartbeat of <pika.heartbeat.HeartbeatChecker object at 0x7fccbdfc6550>>
DEBUG:pika.heartbeat:Removing timer for next heartbeat check interval
DEBUG:pika.adapters.select_connection:remove_timeout: removing timeout <pika.adapters.select_connection._Timeout object at 0x7fccbdfc6be0> with deadline=602851.906386648 and callback=<bound method HeartbeatChecker._check_heartbeat of <pika.heartbeat.HeartbeatChecker object at 0x7fccbdfc6550>>
INFO:pika.adapters.utils.io_services_utils:Aborting transport connection: state=1; <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('172.17.0.11', 33440), raddr=('172.16.104.96', 5672)>
INFO:pika.adapters.utils.io_services_utils:_AsyncTransportBase._initate_abort(): Initiating abrupt asynchronous transport shutdown: state=1; error=None; <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('172.17.0.11', 33440), raddr=('172.16.104.96', 5672)>
INFO:pika.adapters.utils.io_services_utils:Deactivating transport: state=1; <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('172.17.0.11', 33440), raddr=('172.16.104.96', 5672)>
DEBUG:pika.adapters.utils.selector_ioloop_adapter:SelectorIOServicesAdapter.remove_reader(6)
DEBUG:pika.adapters.utils.selector_ioloop_adapter:remove_reader(6) removed handler
DEBUG:pika.adapters.utils.selector_ioloop_adapter:SelectorIOServicesAdapter.remove_writer(6)
DEBUG:pika.adapters.utils.selector_ioloop_adapter:remove_writer(6) neither was set.
DEBUG:pika.adapters.select_connection:add_callback_threadsafe: added callback=functools.partial(<bound method _AsyncTransportBase._connection_lost_notify_async of <pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730>>, None)
DEBUG:pika.adapters.utils.io_services_utils:Leaving Plaintext consumer due to inactive state: state=3; <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('172.17.0.11', 33440), raddr=('172.16.104.96', 5672)>
DEBUG:pika.adapters.select_connection:process_timeouts: invoking callback=functools.partial(<bound method _AsyncTransportBase._connection_lost_notify_async of <pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fccbdfc6730>>, None)
DEBUG:pika.adapters.utils.io_services_utils:Concluding transport shutdown: state=3; error=None
DEBUG:pika.adapters.base_connection:connection_lost: None
INFO:pika.connection:AMQP stack terminated, failed to connect, or aborted: opened=True, error-arg=None; pending-error=ConnectionClosedByClient: (200) 'Normal shutdown'
DEBUG:pika.connection:New Connection state: CLOSED (prev=CLOSING)
INFO:pika.connection:Stack terminated due to ConnectionClosedByClient: (200) 'Normal shutdown'
DEBUG:pika.callback:Processing 0:_on_connection_closed
DEBUG:pika.callback:Calling <bound method ReconnectingConsumer._on_connection_close of <test.ReconnectingConsumer object at 0x7fccbe85b2e0>> for "0:_on_connection_closed"
INFO:root:USERDEBUG ReconnectingConsumer._on_connection_close
DEBUG:pika.adapters.select_connection:Stopping IOLoop
DEBUG:pika.connection:New Connection state: CLOSED (prev=CLOSED)
DEBUG:pika.callback:Added: {'callback': <bound method Connection._on_connection_start of <SelectConnection CLOSED transport=None params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>>, 'one_shot': True, 'only': None, 'arguments': None, 'calls': 1}
DEBUG:pika.callback:Incremented callback reference counter: {'callback': <bound method Connection._on_connection_close_from_broker of <SelectConnection CLOSED transport=None params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>>, 'one_shot': True, 'only': None, 'arguments': None, 'calls': 2}
INFO:pika.adapters.utils.io_services_utils:Closing transport socket and unlinking: state=3; <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('172.17.0.11', 33440), raddr=('172.16.104.96', 5672)>
DEBUG:pika.adapters.select_connection:Deactivating poller
INFO:root:Stopped Run Sensor code with ST2 WrappingThen, i stopped the independent execution (logging included above), and ran the same sensor logic, but using stackstorm internals. /opt/stackstorm/st2/bin/st2sensorcontainer --config-file=/etc/st2/st2.conf --config-file=/etc/st2/st2.docker.conf --config-file=/etc/st2/st2.user.conf --single-sensor-mode --sensor-ref=rabbitmq.RabbitMQBoundQueueSensor --debug From the below log, the last line that seems to be coming from the sensor code in either execution: _try_next_config_async: 'rabbitmq.service.us-west-2.consul':5672 This message is present in both logs. st2sensorcontainer --single-sensor-mode# /opt/stackstorm/st2/bin/st2sensorcontainer --config-file=/etc/st2/st2.conf --config-file=/etc/st2/st2.docker.conf --config-file=/etc/st2/st2.user.conf --single-sensor-mode --sensor-ref=rabbitmq.RabbitMQBoundQueueSensor --debug
INFO [-] Using Python: 3.8.10 (/opt/stackstorm/st2/bin/python)
INFO [-] Using fs encoding: utf-8, default encoding: utf-8, locale: en_US.UTF-8, LANG env variable: en_US.UTF-8, PYTHONIOENCODING env variable: notset
INFO [-] Using config files: /etc/st2/st2.conf,/etc/st2/st2.docker.conf,/etc/st2/st2.user.conf
INFO [-] Using logging config: /etc/st2/logging.sensorcontainer.conf
INFO [-] Using coordination driver: redis
INFO [-] Using metrics driver: noop
INFO [-] Connecting to database "st2" @ "mongo-db.service.us-west-2.consul:27017" as user "st2".
INFO [-] Successfully connected to database "st2" @ "mongo-db.service.us-west-2.consul:27017" as user "st2".
DEBUG [-] Ensuring database indexes...
DEBUG [-] Skipping index cleanup for blacklisted model "PermissionGrantDB"...
DEBUG [-] Indexes are ensured for models: ActionAliasDB, ActionAliasDB, ActionDB, ActionExecutionDB, ActionExecutionDB, ActionExecutionOutputDB, ActionExecutionSchedulingQueueItemDB, ActionExecutionStateDB, ActionExecutionStateDB, ApiKeyDB, ConfigDB, ConfigSchemaDB, GroupToRoleMappingDB, KeyValuePairDB, LiveActionDB, LiveActionDB, PackDB, PermissionGrantDB, PolicyDB, PolicyTypeDB, RoleDB, RuleDB, RuleEnforcementDB, RunnerTypeDB, RunnerTypeDB, SensorTypeDB, TaskExecutionDB, TokenDB, TraceDB, TriggerDB, TriggerInstanceDB, TriggerTypeDB, UserDB, UserRoleAssignmentDB, WorkflowExecutionDB
DEBUG [-] Registering exchanges...
DEBUG [-] Using SSL context for RabbitMQ connection: {}
DEBUG [-] Start from server, version: 0.9, properties: {'capabilities': {'publisher_confirms': True, 'exchange_exchange_bindings': True, 'basic.nack': True, 'consumer_cancel_notify': True, 'connection.blocked': True, 'consumer_priorities': True, 'authentication_failure_close': True, 'per_consumer_qos': True, 'direct_reply_to': True}, 'cluster_name': 'rabbit@rtc-us-west-2b-dev-9af50', 'copyright': 'Copyright (c) 2007-2022 VMware, Inc. or its affiliates.', 'information': 'Licensed under the MPL 2.0. Website: https://rabbitmq.com', 'platform': 'Erlang/OTP 25.2', 'product': 'RabbitMQ', 'version': '3.10.13'}, mechanisms: [b'AMQPLAIN', b'PLAIN'], locales: ['en_US']
DEBUG [-] using channel_id: 1
DEBUG [-] Channel open
DEBUG [-] Registered exchange st2.actionexecutionstate ({'exchange': 'st2.actionexecutionstate', 'type': 'topic', 'durable': True, 'auto_delete': False, 'arguments': None, 'nowait': False, 'passive': False}).
DEBUG [-] Registered exchange st2.announcement ({'exchange': 'st2.announcement', 'type': 'topic', 'durable': True, 'auto_delete': False, 'arguments': None, 'nowait': False, 'passive': False}).
DEBUG [-] Registered exchange st2.execution ({'exchange': 'st2.execution', 'type': 'topic', 'durable': True, 'auto_delete': False, 'arguments': None, 'nowait': False, 'passive': False}).
DEBUG [-] Registered exchange st2.liveaction ({'exchange': 'st2.liveaction', 'type': 'topic', 'durable': True, 'auto_delete': False, 'arguments': None, 'nowait': False, 'passive': False}).
DEBUG [-] Registered exchange st2.liveaction.status ({'exchange': 'st2.liveaction.status', 'type': 'topic', 'durable': True, 'auto_delete': False, 'arguments': None, 'nowait': False, 'passive': False}).
DEBUG [-] Registered exchange st2.trigger ({'exchange': 'st2.trigger', 'type': 'topic', 'durable': True, 'auto_delete': False, 'arguments': None, 'nowait': False, 'passive': False}).
DEBUG [-] Registered exchange st2.trigger_instances_dispatch ({'exchange': 'st2.trigger_instances_dispatch', 'type': 'topic', 'durable': True, 'auto_delete': False, 'arguments': None, 'nowait': False, 'passive': False}).
DEBUG [-] Registered exchange st2.sensor ({'exchange': 'st2.sensor', 'type': 'topic', 'durable': True, 'auto_delete': False, 'arguments': None, 'nowait': False, 'passive': False}).
DEBUG [-] Registered exchange st2.workflow ({'exchange': 'st2.workflow', 'type': 'topic', 'durable': True, 'auto_delete': False, 'arguments': None, 'nowait': False, 'passive': False}).
DEBUG [-] Registered exchange st2.workflow.status ({'exchange': 'st2.workflow.status', 'type': 'topic', 'durable': True, 'auto_delete': False, 'arguments': None, 'nowait': False, 'passive': False}).
DEBUG [-] Closed channel #1
DEBUG [-] using channel_id: 1
DEBUG [-] Channel open
DEBUG [-] Predeclaring queue for exchange "st2.liveaction.status"
DEBUG [-] Predeclared queue for exchange "st2.liveaction.status"
DEBUG [-] Predeclaring queue for exchange "st2.liveaction.status"
DEBUG [-] Predeclared queue for exchange "st2.liveaction.status"
DEBUG [-] Predeclaring queue for exchange "st2.liveaction.status"
DEBUG [-] Predeclared queue for exchange "st2.liveaction.status"
DEBUG [-] Predeclaring queue for exchange "st2.execution"
DEBUG [-] Predeclared queue for exchange "st2.execution"
DEBUG [-] Predeclaring queue for exchange "st2.actionexecutionstate"
DEBUG [-] Predeclared queue for exchange "st2.actionexecutionstate"
DEBUG [-] Predeclaring queue for exchange "st2.trigger_instances_dispatch"
DEBUG [-] Predeclared queue for exchange "st2.trigger_instances_dispatch"
DEBUG [-] Predeclaring queue for exchange "st2.announcement"
DEBUG [-] Predeclared queue for exchange "st2.announcement"
DEBUG [-] Predeclaring queue for exchange "st2.execution"
DEBUG [-] Predeclared queue for exchange "st2.execution"
DEBUG [-] Predeclaring queue for exchange "st2.liveaction"
DEBUG [-] Predeclared queue for exchange "st2.liveaction"
DEBUG [-] Predeclaring queue for exchange "st2.execution.output"
DEBUG [-] Predeclared queue for exchange "st2.execution.output"
DEBUG [-] Predeclaring queue for exchange "st2.workflow.status"
DEBUG [-] Predeclared queue for exchange "st2.workflow.status"
DEBUG [-] Predeclaring queue for exchange "st2.workflow.status"
DEBUG [-] Predeclared queue for exchange "st2.workflow.status"
DEBUG [-] Predeclaring queue for exchange "st2.trigger"
DEBUG [-] Predeclared queue for exchange "st2.trigger"
DEBUG [-] Predeclaring queue for exchange "st2.sensor"
DEBUG [-] Predeclared queue for exchange "st2.sensor"
DEBUG [-] Closed channel #1
DEBUG [-] Inserting system roles (['system_admin', 'admin', 'observer'])
DEBUG [-] found extension EntryPoint.parse('echo = st2common.metrics.drivers.echo_driver:EchoDriver')
DEBUG [-] found extension EntryPoint.parse('noop = st2common.metrics.drivers.noop_driver:NoopDriver')
DEBUG [-] found extension EntryPoint.parse('statsd = st2common.metrics.drivers.statsd_driver:StatsdDriver')
INFO [-] Running in single sensor mode, using a single sensor partitioner...
INFO [-] Setting up container to run 1 sensors.
INFO [-] Sensors list - ['rabbitmq.RabbitMQBoundQueueSensor'].
INFO [-] (PID:297) SensorContainer started.
DEBUG [-] Using SSL context for RabbitMQ connection: {}
DEBUG [-] Starting sensor CUD watcher...
DEBUG [-] Using SSL context for RabbitMQ connection: {}
INFO [-] Running sensor rabbitmq.RabbitMQBoundQueueSensor
DEBUG [-] Creating temporary auth token for sensor RabbitMQBoundQueueSensor
AUDIT [-] Access granted to "sensors_container" with the token set to expire at "2023-01-18T20:50:03.885684Z". (username='sensors_container',token_expiration='2023-01-18T20:50:03.885684Z')
DEBUG [-] Running sensor subprocess (cmd="/opt/stackstorm/virtualenvs/rabbitmq/bin/python /opt/stackstorm/st2/lib/python3.8/site-packages/st2reactor/container/sensor_wrapper.py --pack=rabbitmq --file-path=/opt/stackstorm/packs/rabbitmq/sensors/bound_queues_sensor.py --class-name=RabbitMQBoundQueueSensor --trigger-type-refs=rabbitmq.routed_message --parent-args=["--config-file=/etc/st2/st2.conf", "--config-file=/etc/st2/st2.docker.conf", "--config-file=/etc/st2/st2.user.conf", "--single-sensor-mode", "--sensor-ref=rabbitmq.RabbitMQBoundQueueSensor", "--debug"]")
DEBUG [-] Dispatching trigger (trigger=core.st2.sensor.process_spawn,payload={'trigger': 'core.st2.sensor.process_spawn', 'payload': {'id': 'RabbitMQBoundQueueSensor', 'timestamp': 1673988603, 'pid': 299, 'cmd': '/opt/stackstorm/virtualenvs/rabbitmq/bin/python /opt/stackstorm/st2/lib/python3.8/site-packages/st2reactor/container/sensor_wrapper.py --pack=rabbitmq --file-path=/opt/stackstorm/packs/rabbitmq/sensors/bound_queues_sensor.py --class-name=RabbitMQBoundQueueSensor --trigger-type-refs=rabbitmq.routed_message --parent-args=["--config-file=/etc/st2/st2.conf", "--config-file=/etc/st2/st2.docker.conf", "--config-file=/etc/st2/st2.user.conf", "--single-sensor-mode", "--sensor-ref=rabbitmq.RabbitMQBoundQueueSensor", "--debug"]'}, 'trace_context': None})
DEBUG [-] Start from server, version: 0.9, properties: {'capabilities': {'publisher_confirms': True, 'exchange_exchange_bindings': True, 'basic.nack': True, 'consumer_cancel_notify': True, 'connection.blocked': True, 'consumer_priorities': True, 'authentication_failure_close': True, 'per_consumer_qos': True, 'direct_reply_to': True}, 'cluster_name': 'rabbit@rtc-us-west-2b-dev-9af50', 'copyright': 'Copyright (c) 2007-2022 VMware, Inc. or its affiliates.', 'information': 'Licensed under the MPL 2.0. Website: https://rabbitmq.com', 'platform': 'Erlang/OTP 25.2', 'product': 'RabbitMQ', 'version': '3.10.13'}, mechanisms: [b'AMQPLAIN', b'PLAIN'], locales: ['en_US']
INFO [-] Connected to amqp://st2:**@rabbitmq.service.us-west-2.consul:5672//
DEBUG [-] using channel_id: 1
DEBUG [-] Channel open
DEBUG [-] Start from server, version: 0.9, properties: {'capabilities': {'publisher_confirms': True, 'exchange_exchange_bindings': True, 'basic.nack': True, 'consumer_cancel_notify': True, 'connection.blocked': True, 'consumer_priorities': True, 'authentication_failure_close': True, 'per_consumer_qos': True, 'direct_reply_to': True}, 'cluster_name': 'rabbit@rtc-us-west-2c-dev-b8df4', 'copyright': 'Copyright (c) 2007-2022 VMware, Inc. or its affiliates.', 'information': 'Licensed under the MPL 2.0. Website: https://rabbitmq.com', 'platform': 'Erlang/OTP 25.2', 'product': 'RabbitMQ', 'version': '3.10.13'}, mechanisms: [b'AMQPLAIN', b'PLAIN'], locales: ['en_US']
DEBUG [-] using channel_id: 1
DEBUG [-] Channel open
DEBUG [-] Closed channel #1
INFO [-] Sensor rabbitmq.RabbitMQBoundQueueSensor started
DEBUG [-] 1 active sensor(s)
DEBUG [-] Using SSL context for RabbitMQ connection: {}
INFO [-] Found config for sensor "RabbitMQBoundQueueSensor"
DEBUG [-] Using SSL context for RabbitMQ connection: {}
INFO [-] Watcher started
INFO [-] Running sensor initialization code
DEBUG [-] Using PollPoller
DEBUG [-] New Connection state: CLOSED (prev=CLOSED)
DEBUG [-] Added: {'callback': <bound method Connection._on_connection_start of <SelectConnection CLOSED transport=None params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>>, 'one_shot': True, 'only': None, 'arguments': None, 'calls': 1}
DEBUG [-] Added: {'callback': <bound method Connection._on_connection_close_from_broker of <SelectConnection CLOSED transport=None params=<ConnectionParameters host=rabbitmq.service.us-west-2.consul port=5672 virtual_host=/ ssl=False>>>, 'one_shot': True, 'only': None, 'arguments': None, 'calls': 1}
DEBUG [-] Added: {'callback': <bound method ReconnectingConsumer._on_connection_open_error of <bound_queues_sensor.ReconnectingConsumer object at 0x7f76a8cc9190>>, 'one_shot': False, 'only': None, 'arguments': None}
DEBUG [-] Added: {'callback': <bound method ReconnectingConsumer._on_connection_open of <bound_queues_sensor.ReconnectingConsumer object at 0x7f76a8cc9190>>, 'one_shot': False, 'only': None, 'arguments': None}
DEBUG [-] Added: {'callback': <bound method ReconnectingConsumer._on_connection_close of <bound_queues_sensor.ReconnectingConsumer object at 0x7f76a8cc9190>>, 'one_shot': False, 'only': None, 'arguments': None}
DEBUG [-] New Connection state: INIT (prev=CLOSED)
DEBUG [-] Starting AMQP Connection workflow asynchronously.
DEBUG [-] call_later: added timeout <pika.adapters.select_connection._Timeout object at 0x7f76a8ce7700> with deadline=602968.959868794 and callback=functools.partial(<bound method AMQPConnectionWorkflow._start_new_cycle_async of <pika.adapters.utils.connection_workflow.AMQPConnectionWorkflow object at 0x7f76a8cc9250>>, first=True); now=602968.959868794; delay=0
INFO [-] Running sensor in passive mode
DEBUG [-] Entering IOLoop
DEBUG [-] Beginning a new AMQP connection workflow cycle; attempts remaining after this: 0
DEBUG [-] call_later: added timeout <pika.adapters.select_connection._Timeout object at 0x7f76a8cc9a00> with deadline=602968.960647811 and callback=<bound method AMQPConnectionWorkflow._try_next_config_async of <pika.adapters.utils.connection_workflow.AMQPConnectionWorkflow object at 0x7f76a8cc9250>>; now=602968.960647811; delay=0
DEBUG [-] _try_next_config_async: 'rabbitmq.service.us-west-2.consul':5672
DEBUG [-] Found existing trigger: TriggerDB(description=None, id=63c06492e1745375f9e6d07d, metadata_file=None, name="routed_message", pack="rabbitmq", parameters={}, ref="rabbitmq.routed_message", ref_count=0, type="rabbitmq.routed_message", uid="trigger:rabbitmq:routed_message:99914b932bd37a50b983c5e7c90ae93b") in db.
DEBUG [-] Calling sensor "add_trigger" method (trigger.type=rabbitmq.routed_message)
DEBUG [-] 1 active sensor(s)
DEBUG [-] 1 active sensor(s)
DEBUG [-] 1 active sensor(s)
DEBUG [-] 1 active sensor(s)
DEBUG [-] 1 active sensor(s)
DEBUG [-] 1 active sensor(s)
DEBUG [-] 1 active sensor(s)
DEBUG [-] 1 active sensor(s)
DEBUG [-] 1 active sensor(s)
DEBUG [-] 1 active sensor(s)
DEBUG [-] 1 active sensor(s)
DEBUG [-] 1 active sensor(s)
DEBUG [-] 1 active sensor(s)
DEBUG [-] 1 active sensor(s)
DEBUG [-] Deactivating poller
INFO [-] Container shutting down. Invoking cleanup on sensors.
ERROR [-] Process container stopped.
INFO [-] All sensors are shut down.
DEBUG [-] Shutting down sensor watcher.
DEBUG [-] Closed channel #1
DEBUG [-] Start from server, version: 0.9, properties: {'capabilities': {'publisher_confirms': True, 'exchange_exchange_bindings': True, 'basic.nack': True, 'consumer_cancel_notify': True, 'connection.blocked': True, 'consumer_priorities': True, 'authentication_failure_close': True, 'per_consumer_qos': True, 'direct_reply_to': True}, 'cluster_name': 'rabbit@rtc-us-west-2b-dev-9af50', 'copyright': 'Copyright (c) 2007-2022 VMware, Inc. or its affiliates.', 'information': 'Licensed under the MPL 2.0. Website: https://rabbitmq.com', 'platform': 'Erlang/OTP 25.2', 'product': 'RabbitMQ', 'version': '3.10.13'}, mechanisms: [b'AMQPLAIN', b'PLAIN'], locales: ['en_US']
DEBUG [-] using channel_id: 1
DEBUG [-] Channel open
DEBUG [-] Closed channel #1
INFO [-] (PID:297) SensorContainer stopped. Reason - KeyboardInterrupt More digging to do. |
Beta Was this translation helpful? Give feedback.
-
Finally making some good headway... and I think I found smoke. What appears to be happening is; pika is having issues with resolution running inside Changing the sensor config to use an IP instead of a hostname now allows me to start/stop I need to dive into pika's resolution logic next to try and see whats going on and how it may be getting chomped by [-] process_timeouts: invoking callback=<bound method _AddressResolver._dispatch_result of <pika.adapters.utils.selector_ioloop_adapter._AddressResolver object at 0x7f7acc4b3d90>> @armab Im curious if you have ideas on that part specifically? Without knowing how pika is doing resolution, what could be happening in |
Beta Was this translation helpful? Give feedback.
-
PR for new |
Beta Was this translation helpful? Give feedback.
Finally making some good headway... and I think I found smoke.
What appears to be happening is; pika is having issues with resolution running inside
st2sensorcontainer
inside docker.Changing the sensor config to use an IP instead of a hostname now allows me to start/stop
st2sensorcontainer
binary, as well as restart (in place) the docker container. I can togglehost: "rabbitmq"
to and fromhost: "<ip>"
and replicate reliably.I need to dive into pika's resolution logic next to try and see whats going on and how it may be getting chomped by
st2sensorcontainer
. I think ill be starting here based on this is where the logic seems to be halting[-] process_timeouts: invoking callback=<bound …