@@ -180,6 +180,7 @@ def connect_pool(
180180 connected. This can be checked with `client.is_connected()`.
181181 """
182182 assert self .is_connected () is False
183+ assert self ._reconnecting is False
183184 if len (auth ) == 1 :
184185 auth = auth [0 ]
185186
@@ -229,21 +230,19 @@ def connect(
229230 self ._pool_idx = 0
230231 return self ._connect (timeout = timeout )
231232
232- async def reconnect (self ) -> None :
233+ def reconnect (self ) -> Optional [ asyncio . Future ] :
233234 """Re-connect to ThingsDB.
234235
235236 This method can be used, even when a connection still exists. In case
236237 of a connection pool, a call to `reconnect()` will switch to another
237- node.
238+ node. If the client is already re-connecting, this method returns None,
239+ otherwise, the reconnect Future is returned, await of the Future is
240+ possible but not required.
238241 """
239242 if self ._reconnecting :
240243 return
241-
242244 self ._reconnecting = True
243- try :
244- await self ._reconnect_loop ()
245- finally :
246- self ._reconnecting = False
245+ return asyncio .ensure_future (self ._reconnect_loop (), loop = self ._loop )
247246
248247 async def wait_closed (self ) -> None :
249248 """Wait for a connection to close.
@@ -342,6 +341,7 @@ async def _ensure_write(
342341 while True :
343342 if not self .is_connected ():
344343 logging .info ('Wait for a connection' )
344+ self .reconnect () # ensure the re-connect loop
345345 await asyncio .sleep (1.0 )
346346 continue
347347
@@ -572,7 +572,7 @@ def _on_event(self, pkg):
572572 status , node_id = pkg .data ['status' ], pkg .data ['id' ]
573573
574574 if self ._reconnect and status == 'SHUTTING_DOWN' :
575- asyncio . ensure_future ( self .reconnect (), loop = self . _loop )
575+ self .reconnect ()
576576
577577 logging .debug (
578578 f'Node with Id { node_id } has changed its status to: { status } ' )
@@ -596,35 +596,38 @@ def _on_connection_lost(self, protocol, exc):
596596 return
597597 self ._protocol = None
598598 if self ._reconnect :
599- asyncio . ensure_future ( self .reconnect (), loop = self . _loop )
599+ self .reconnect ()
600600
601601 async def _reconnect_loop (self ):
602- wait_time = 1
603- timeout = 2
604- protocol = self ._protocol
605- while True :
606- host , port = self ._pool [self ._pool_idx ]
607- try :
608- await self ._connect (timeout = timeout )
609- await self ._ping (timeout = 2 )
610- await self ._authenticate (timeout = 5 )
611- await self ._rejoin ()
612- except Exception as e :
613- logging .error (
614- f'Connecting to { host } :{ port } failed: '
615- f'{ e } ({ e .__class__ .__name__ } ), '
616- f'Try next connect in { wait_time } seconds'
617- )
618- else :
619- if protocol and protocol .transport :
620- # make sure the `old` connection will be dropped
621- self ._loop .call_later (10.0 , protocol .transport .close )
622- break
623-
624- await asyncio .sleep (wait_time )
625- wait_time *= 2
626- wait_time = min (wait_time , self .MAX_RECONNECT_WAIT_TIME )
627- timeout = min (timeout + 1 , self .MAX_RECONNECT_TIMEOUT )
602+ try :
603+ wait_time = 1
604+ timeout = 2
605+ protocol = self ._protocol
606+ while True :
607+ host , port = self ._pool [self ._pool_idx ]
608+ try :
609+ await self ._connect (timeout = timeout )
610+ await self ._ping (timeout = 2 )
611+ await self ._authenticate (timeout = 5 )
612+ await self ._rejoin ()
613+ except Exception as e :
614+ logging .error (
615+ f'Connecting to { host } :{ port } failed: '
616+ f'{ e } ({ e .__class__ .__name__ } ), '
617+ f'Try next connect in { wait_time } seconds'
618+ )
619+ else :
620+ if protocol and protocol .transport :
621+ # make sure the `old` connection will be dropped
622+ self ._loop .call_later (10.0 , protocol .transport .close )
623+ break
624+
625+ await asyncio .sleep (wait_time )
626+ wait_time *= 2
627+ wait_time = min (wait_time , self .MAX_RECONNECT_WAIT_TIME )
628+ timeout = min (timeout + 1 , self .MAX_RECONNECT_TIMEOUT )
629+ finally :
630+ self ._reconnecting = False
628631
629632 def _ping (self , timeout ):
630633 return self ._write (Proto .REQ_PING , timeout = timeout )
0 commit comments