Skip to content

Commit

Permalink
flake8
Browse files Browse the repository at this point in the history
  • Loading branch information
mreiferson committed Sep 12, 2014
1 parent 258019e commit af37a86
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 24 deletions.
11 changes: 8 additions & 3 deletions examples/nsq_to_nsq.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import logging
from host_pool import HostPool


class NSQProxy:
def __init__(self, topic, nsqds):
self.topic = topic
Expand All @@ -18,7 +19,8 @@ def __init__(self, topic, nsqds):
def relay(self, nsq_message):
nsq_message.enable_async()
writer = self.writer_pool.get()
callback = functools.partial(self._on_message_response, nsq_message=nsq_message, writer=writer)
callback = functools.partial(
self._on_message_response, nsq_message=nsq_message, writer=writer)
writer.pub(self.topic, nsq_message.body, callback)

def _on_message_response(self, conn, data, nsq_message, writer):
Expand All @@ -30,6 +32,7 @@ def _on_message_response(self, conn, data, nsq_message, writer):
self.writer_pool.success(writer)
nsq_message.finish()


if __name__ == "__main__":
tornado.options.define('destination_topic', type=str)
tornado.options.define('topic', type=str)
Expand All @@ -45,8 +48,10 @@ def _on_message_response(self, conn, data, nsq_message, writer):
assert tornado.options.options.destination_nsqd_tcp_address
assert tornado.options.options.channel

destination_topic = str(tornado.options.options.destination_topic or tornado.options.options.topic)
lookupd_http_addresses = map(lambda addr: 'http://' + addr, tornado.options.options.lookupd_http_address)
destination_topic = str(tornado.options.options.destination_topic or
tornado.options.options.topic)
lookupd_http_addresses = map(lambda addr: 'http://' + addr,
tornado.options.options.lookupd_http_address)

proxy = NSQProxy(destination_topic, tornado.options.options.destination_nsqd_tcp_address)

Expand Down
3 changes: 1 addition & 2 deletions nsq/async.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def __init__(self, host, port, timeout=1.0, heartbeat_interval=30, requeue_delay
if self.user_agent is None:
self.user_agent = 'pynsq/%s' % __version__

self._authentication_required = False # tracking server auth state
self._authentication_required = False # tracking server auth state
self.auth_secret = auth_secret
super(AsyncConn, self).__init__()

Expand Down Expand Up @@ -404,7 +404,6 @@ def _on_message_requeue(self, message, backoff=True, time_ms=-1, **kwargs):
self.trigger('backoff', conn=self)
else:
self.trigger('continue', conn=self)


def _on_message_finish(self, message, **kwargs):
self.in_flight -= 1
Expand Down
6 changes: 3 additions & 3 deletions nsq/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ def _on_connection_identify_response(self, conn, data, **kwargs):
logger.info('[%s:%s] IDENTIFY received %r' % (conn.id, self.name, data))
if conn.tls_v1 and not data.get('tls_v1'):
logger.warning('[%s:%s] tls_v1 requested but disabled, could not negotiate feature',
conn.id, self.name)
conn.id, self.name)
if conn.snappy and not data.get('snappy'):
logger.warning('[%s:%s] snappy requested but disabled, could not negotiate feature',
conn.id, self.name)
conn.id, self.name)

def _on_connection_auth(self, conn, data, **kwargs):
logger.info('[%s:%s] AUTH sent' % (conn.id, self.name))
Expand Down Expand Up @@ -61,7 +61,7 @@ def is_stale(conn):
# this connection hasnt received data for more than
# the configured heartbeat interval, close it
logger.warning('[%s:%s] connection is stale (%.02fs), closing',
conn.id, self.name, (now - timestamp))
conn.id, self.name, (now - timestamp))
conn.close()

def _on_heartbeat(self, conn):
Expand Down
4 changes: 3 additions & 1 deletion nsq/nsq.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def __init__(self, msg, error=None):

def __str__(self):
return 'SendError: %s (%s)' % (self.msg, self.error)

def __repr__(self):
return 'SendError: %s (%s)' % (self.msg, self.error)

Expand Down Expand Up @@ -75,9 +75,11 @@ def subscribe(topic, channel):
def identify(data):
return _command('IDENTIFY', json.dumps(data))


def auth(data):
return _command('AUTH', data)


def ready(count):
assert isinstance(count, int), 'ready count must be an integer'
assert count >= 0, 'ready count cannot be negative'
Expand Down
26 changes: 14 additions & 12 deletions nsq/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ def _handle_message(self, conn, message):
rdy_conn = random.choice(conns_with_no_rdy)
if rdy_conn is not conn:
logger.info('[%s:%s] redistributing RDY to %s',
conn.id, self.name, rdy_conn.id)
conn.id, self.name, rdy_conn.id)

self._maybe_update_rdy(rdy_conn)

Expand All @@ -305,7 +305,7 @@ def _handle_message(self, conn, message):
success = self.process_message(message)
except Exception:
logger.exception('[%s:%s] uncaught exception while handling message %s body:%r',
conn.id, self.name, message.id, message.body)
conn.id, self.name, message.id, message.body)
if not message.has_responded():
return message.requeue()

Expand Down Expand Up @@ -349,14 +349,14 @@ def _on_backoff_resume(self, success, **kwargs):
self.backoff_timer.success()
elif success is False and not self.backoff_block:
self.backoff_timer.failure()

self._enter_continue_or_exit_backoff()

def _complete_backoff_block(self):
self.backoff_block_completed = True
rdy = self._connection_max_in_flight()
logger.info('[%s] backoff complete, resuming normal operation (%d connections)',
self.name, len(self.conns))
self.name, len(self.conns))
for c in self.conns.values():
self._send_rdy(c, rdy)

Expand Down Expand Up @@ -388,7 +388,7 @@ def _start_backoff_block(self):
backoff_interval = self.backoff_timer.get_interval()

logger.info('[%s] backing off for %0.2f seconds (%d connections)',
self.name, backoff_interval, len(self.conns))
self.name, backoff_interval, len(self.conns))
for c in self.conns.values():
self._send_rdy(c, 0)

Expand Down Expand Up @@ -551,19 +551,19 @@ def query_lookupd(self):
def _finish_query_lookupd(self, response, lookupd_url):
if response.error:
logger.warning('[%s] lookupd %s query error: %s',
self.name, lookupd_url, response.error)
self.name, lookupd_url, response.error)
return

try:
lookup_data = json.loads(response.body)
except ValueError:
logger.warning('[%s] lookupd %s failed to parse JSON: %r',
self.name, lookupd_url, response.body)
self.name, lookupd_url, response.body)
return

if lookup_data['status_code'] != 200:
logger.warning('[%s] lookupd %s responded with %d',
self.name, lookupd_url, lookup_data['status_code'])
self.name, lookupd_url, lookup_data['status_code'])
return

for producer in lookup_data['data']['producers']:
Expand All @@ -590,13 +590,13 @@ def _redistribute_rdy_state(self):
if len(self.conns) > self.max_in_flight:
self.need_rdy_redistributed = True
logger.debug('redistributing RDY state (%d conns > %d max_in_flight)',
len(self.conns), self.max_in_flight)
len(self.conns), self.max_in_flight)

backoff_interval = self.backoff_timer.get_interval()
if backoff_interval and len(self.conns) > 1:
self.need_rdy_redistributed = True
logger.debug('redistributing RDY state (%d backoff interval and %d conns > 1)',
backoff_interval, len(self.conns))
backoff_interval, len(self.conns))

if self.need_rdy_redistributed:
self.need_rdy_redistributed = False
Expand All @@ -606,7 +606,7 @@ def _redistribute_rdy_state(self):
for conn_id, conn in self.conns.iteritems():
last_message_duration = time.time() - conn.last_msg_timestamp
logger.debug('[%s:%s] rdy: %d (last message received %.02fs)',
conn.id, self.name, conn.rdy, last_message_duration)
conn.id, self.name, conn.rdy, last_message_duration)
if conn.rdy > 0 and last_message_duration > self.low_rdy_idle_timeout:
logger.info('[%s:%s] idle connection, giving up RDY count', conn.id, self.name)
self._send_rdy(conn, 0)
Expand Down Expand Up @@ -657,7 +657,7 @@ def giving_up(self, message):
:param message: the :class:`nsq.Message` received
"""
logger.warning('[%s] giving up on message %s after %d tries (max:%d) %r',
self.name, message.id, message.attempts, self.max_tries, message.body)
self.name, message.id, message.attempts, self.max_tries, message.body)

def disabled(self):
"""
Expand All @@ -674,6 +674,7 @@ def validate_message(self, message):
def preprocess_message(self, message):
return message


def _utf8_params(params):
"""encode a dictionary of URL parameters (including iterables) as utf-8"""
assert isinstance(params, dict)
Expand All @@ -690,6 +691,7 @@ def _utf8_params(params):
encoded_params.append((k, v))
return dict(encoded_params)


def _utf8(s):
"""encode a unicode string as utf-8"""
if isinstance(s, unicode):
Expand Down
7 changes: 4 additions & 3 deletions tests/test_backoff.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ def test_backoff_out_of_order():
]
assert conn2.stream.write.call_args_list == [((arg,),) for arg in expected_args]


def test_backoff_requeue_recovery():
mock_ioloop = create_autospec(IOLoop)
r = _get_reader(mock_ioloop, max_in_flight=2)
Expand All @@ -149,7 +150,7 @@ def test_backoff_requeue_recovery():

msg = _send_message(conn)

# go into backoff,
# go into backoff
msg.trigger('requeue', message=msg)
assert r.backoff_block is True
assert r.backoff_timer.get_interval() > 0
Expand All @@ -160,7 +161,7 @@ def test_backoff_requeue_recovery():
timeout_args[1]()
assert r.backoff_block is False
assert r.backoff_timer.get_interval() != 0

msg = _send_message(conn)

# This should not move out of backoff (since backoff=False)
Expand All @@ -180,7 +181,7 @@ def test_backoff_requeue_recovery():
msg.trigger('finish', message=msg)
assert r.backoff_block is False
assert r.backoff_timer.get_interval() == 0

print conn.stream.write.call_args_list

expected_args = [
Expand Down

0 comments on commit af37a86

Please sign in to comment.