Skip to content

Commit

Permalink
Merge pull request #174 from panaetov/feature/active_connection_polling
Browse files Browse the repository at this point in the history
writer: discard closed connections in pub
  • Loading branch information
mreiferson authored May 4, 2017
2 parents ae4d6a5 + 5997466 commit 7a8b47b
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 4 deletions.
11 changes: 10 additions & 1 deletion nsq/async.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,17 @@ def id(self):
def __str__(self):
return self.host + ':' + str(self.port)

def connected(self):
return self.state == CONNECTED

def connecting(self):
return self.state == CONNECTING

def closed(self):
return self.state in (INIT, DISCONNECTED)

def connect(self):
if self.state not in [INIT, DISCONNECTED]:
if not self.closed():
return

self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
Expand Down
11 changes: 8 additions & 3 deletions nsq/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,15 @@ def _pub(self, command, topic, msg, delay_ms=None, callback=None):
callback = functools.partial(self._finish_pub, command=command,
topic=topic, msg=msg)

if not self.conns:
callback(None, protocol.SendError('no connections'))
open_connections = [
conn for conn in self.conns.values()
if conn.connected()
]
if not open_connections:
callback(None, protocol.SendError('no open connections'))
return

conn = random.choice(list(self.conns.values()))
conn = random.choice(open_connections)
conn.callback_queue.append(callback)
cmd = getattr(protocol, command)

Expand All @@ -145,6 +149,7 @@ def _pub(self, command, topic, msg, delay_ms=None, callback=None):
conn.send(cmd(*args))
except Exception:
logger.exception('[%s] failed to send %s' % (conn.id, command))
callback(None, protocol.SendError('send error'))
conn.close()

def _on_connection_error(self, conn, error, **kwargs):
Expand Down

0 comments on commit 7a8b47b

Please sign in to comment.