Skip to content

Commit

Permalink
*: deflake
Browse files Browse the repository at this point in the history
  • Loading branch information
mreiferson committed May 5, 2017
1 parent 7a8b47b commit 237613a
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 43 deletions.
28 changes: 13 additions & 15 deletions nsq/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,7 @@ def query_lookupd(self):

req = tornado.httpclient.HTTPRequest(
lookupd_url, method='GET',
headers={"Accept":"application/vnd.nsq; version=1.0"},
headers={'Accept': 'application/vnd.nsq; version=1.0'},
connect_timeout=self.lookupd_connect_timeout,
request_timeout=self.lookupd_request_timeout)
callback = functools.partial(self._finish_query_lookupd, lookupd_url=lookupd_url)
Expand All @@ -607,12 +607,12 @@ def _finish_query_lookupd(self, response, lookupd_url):
address = producer.get('broadcast_address', producer.get('address'))
assert address
self.connect_to_nsqd(address, producer['tcp_port'])

def set_max_in_flight(self, max_in_flight):
"""dynamically adjust the reader max_in_flight count. Set to 0 to immediately disable a Reader"""
assert isinstance(max_in_flight, int)
self.max_in_flight = max_in_flight

if max_in_flight == 0:
# set RDY 0 to all connections
for conn in itervalues(self.conns):
Expand All @@ -623,8 +623,7 @@ def set_max_in_flight(self, max_in_flight):
else:
self.need_rdy_redistributed = True
self._redistribute_rdy_state()



def _redistribute_rdy_state(self):
# We redistribute RDY counts in a few cases:
#
Expand Down Expand Up @@ -672,7 +671,7 @@ def _redistribute_rdy_state(self):
max_in_flight = self.max_in_flight - self.total_rdy

# randomly walk the list of possible connections and send RDY 1 (up to our
# calculate "max_in_flight"). We only need to send RDY 1 because in both
# calculated "max_in_flight"). We only need to send RDY 1 because in both
# cases described above your per connection RDY count would never be higher.
#
# We also don't attempt to avoid the connections who previously might have had RDY 1
Expand Down Expand Up @@ -713,8 +712,7 @@ def giving_up(self, message):
"""
logger.warning('[%s] giving up on message %s after %d tries (max:%d) %r',
self.name, message.id, message.attempts, self.max_tries, message.body)



def _on_connection_identify_response(self, conn, data, **kwargs):
if not hasattr(self, '_disabled_notice'):
self._disabled_notice = True
Expand All @@ -725,24 +723,24 @@ def cast(x):
return int(x)
except:
return x
return [cast(x) for x in v.replace('-','.').split('.')]
return [cast(x) for x in v.replace('-', '.').split('.')]

if self.disabled.__code__ != Reader.disabled.__code__ and semver(data['version']) >= semver('0.3'):
logging.warning('disabled() deprecated and incompatible with nsqd >= 0.3. ' +
'It will be removed in a future release. Use set_max_in_flight(0) instead')
if self.disabled.__code__ != Reader.disabled.__code__ and \
semver(data['version']) >= semver('0.3'):
logging.warning('disabled() deprecated and incompatible with nsqd >= 0.3. ' +
'It will be removed in a future release. Use set_max_in_flight(0) instead')
warnings.warn('disabled() is deprecated and will be removed in a future release, ' +
'use set_max_in_flight(0) instead', DeprecationWarning)
'use set_max_in_flight(0) instead', DeprecationWarning)
return super(Reader, self)._on_connection_identify_response(conn, data, **kwargs)


@classmethod
def disabled(cls):
"""
Called as part of RDY handling to identify whether this Reader has been disabled
This is useful to subclass and override to examine a file on disk or a key in cache
to identify if this reader should pause execution (during a deploy, etc.).
Note: deprecated. Use set_max_in_flight(0)
"""
return False
Expand Down
12 changes: 6 additions & 6 deletions tests/test_backoff.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,8 +284,8 @@ def test_backoff_hard():
assert r.backoff_block is False
assert r.backoff_timer.get_interval() == 0

for i, call in enumerate(conn.stream.write.call_args_list):
print("%d: %s" % (i, call))
for i, f in enumerate(conn.stream.write.call_args_list):
print("%d: %s" % (i, f))
assert conn.stream.write.call_args_list == [call(arg) for arg in expected_args]


Expand Down Expand Up @@ -383,8 +383,8 @@ def test_backoff_many_conns():
assert r.backoff_timer.get_interval() == 0

for c in conns:
for i, call in enumerate(c.stream.write.call_args_list):
print("%d: %s" % (i, call))
for i, f in enumerate(c.stream.write.call_args_list):
print("%d: %s" % (i, f))
assert c.stream.write.call_args_list == [call(arg) for arg in c.expected_args]


Expand Down Expand Up @@ -490,6 +490,6 @@ def test_backoff_conns_disconnect():
assert r.backoff_timer.get_interval() == 0

for c in conns:
for i, call in enumerate(c.stream.write.call_args_list):
print("%d: %s" % (i, call))
for i, f in enumerate(c.stream.write.call_args_list):
print("%d: %s" % (i, f))
assert c.stream.write.call_args_list == [call(arg) for arg in c.expected_args]
6 changes: 2 additions & 4 deletions tests/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,15 @@ def pytest_generate_tests(metafunc):
('invalid_name_due_to_length_this_is_really_really_really_really_long', False),
('test-with_period.', True),
('test#ephemeral', True),
('test:ephemeral', False),
]:
('test:ephemeral', False)]:
metafunc.addcall(funcargs=dict(name=name, good=good))
if metafunc.function == test_channel_names:
for name, good in [
('test', True),
('test-with_period.', True),
('test#ephemeral', True),
('invalid_name_due_to_length_this_is_really_really_really_really_long', False),
('invalid name with space', False),
]:
('invalid name with space', False)]:
metafunc.addcall(funcargs=dict(name=name, good=good))


Expand Down
3 changes: 1 addition & 2 deletions tests/test_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ def pytest_generate_tests(metafunc):
b'PUB test\n' + struct_l.pack(len(msgs[0])) + to_bytes(msgs[0])),
(protocol.mpub,
{'topic': 'test', 'data': msgs},
b'MPUB test\n' + struct_l.pack(len(mpub_body)) + to_bytes(mpub_body))
]:
b'MPUB test\n' + struct_l.pack(len(mpub_body)) + to_bytes(mpub_body))]:
metafunc.addcall(funcargs=dict(cmd_method=cmd_method, kwargs=kwargs, result=result))


Expand Down
23 changes: 9 additions & 14 deletions tests/test_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def setUp(self):
super(IntegrationBase, self).setUp()
if not hasattr(self, 'processes'):
self.processes = []

if self.nsqlookupd_command:
proc = subprocess.Popen(self.nsqlookupd_command)
self.processes.append(proc)
Expand All @@ -42,14 +42,12 @@ def setUp(self):
self.processes.append(proc)
self.wait_ping('http://127.0.0.1:4151/ping')


def tearDown(self):
super(IntegrationBase, self).tearDown()
for proc in self.processes:
os.kill(proc.pid, signal.SIGKILL)
proc.wait()


def wait_ping(self, endpoint):
start = time.time()
http = tornado.httpclient.HTTPClient()
Expand All @@ -65,7 +63,6 @@ def wait_ping(self, endpoint):
time.sleep(0.1)
continue


def _send_messages(self, topic, count, body):
c = AsyncConn('127.0.0.1', 4150, io_loop=self.io_loop)
c.connect()
Expand Down Expand Up @@ -96,7 +93,7 @@ def test_bad_reader_arguments(self):
topic = 'test_reader_msgs_%s' % time.time()
bad_options = dict(self.identify_options)
bad_options.update(dict(foo=10))
handler = lambda x: None
handler = lambda x: None # noqa

self.assertRaises(
AssertionError,
Expand Down Expand Up @@ -152,7 +149,6 @@ def _on_ready(*args, **kwargs):
assert response['conn'] is c
assert response['data'] == b'OK'


def test_conn_messages(self):
self.msg_count = 0

Expand Down Expand Up @@ -284,15 +280,14 @@ def handler(msg):
return True

r = Reader(lookupd_http_addresses=['http://127.0.0.1:4161'],
topic=topic,
channel='ch',
io_loop=self.io_loop,
message_handler=handler,
lookupd_poll_interval=1,
lookupd_poll_jitter=0.01,
max_in_flight=100,
topic=topic,
channel='ch',
io_loop=self.io_loop,
message_handler=handler,
lookupd_poll_interval=1,
lookupd_poll_jitter=0.01,
max_in_flight=100,
**self.identify_options)

self.wait()
r.close()

6 changes: 4 additions & 2 deletions tests/test_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ def test_sync_authenticate_subscribe():
c = sync.SyncConn()
c.connect("127.0.0.1", 4150)

c.send(protocol.identify({'short_id': 'test', 'long_id': 'test.example', 'client_id': 'test', 'hostname':'test.example'}))
c.send(protocol.identify({'short_id': 'test', 'long_id': 'test.example',
'client_id': 'test', 'hostname': 'test.example'}))
c.send(protocol.subscribe('test', 'ch'))

mock_response_write(c, protocol.FRAME_TYPE_RESPONSE, b'OK')
Expand All @@ -54,7 +55,8 @@ def test_sync_receive_messages():
c = sync.SyncConn()
c.connect("127.0.0.1", 4150)

c.send(protocol.identify({'short_id': 'test', 'long_id': 'test.example', 'client_id': 'test', 'hostname':'test.example'}))
c.send(protocol.identify({'short_id': 'test', 'long_id': 'test.example',
'client_id': 'test', 'hostname': 'test.example'}))
c.send(protocol.subscribe('test', 'ch'))

mock_response_write(c, protocol.FRAME_TYPE_RESPONSE, b'OK')
Expand Down
8 changes: 8 additions & 0 deletions tox.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
[pytest]
norecursedirs = .git

[flake8]
max_line_length = 100
exclude = .git
# http://pep8.readthedocs.org/en/latest/intro.html#error-codes
ignore = E261,E265,E402

0 comments on commit 237613a

Please sign in to comment.