File tree Expand file tree Collapse file tree 3 files changed +27
-14
lines changed
Expand file tree Collapse file tree 3 files changed +27
-14
lines changed Original file line number Diff line number Diff line change 22import logging
33import random
44import ssl
5+ import time
56from collections import defaultdict
67from ssl import SSLContext , PROTOCOL_TLS
78from typing import Optional , Union , Any
@@ -360,6 +361,9 @@ async def _ensure_write(
360361 ) -> asyncio .Future :
361362 if not self ._pool :
362363 raise ConnectionError ('no connection' )
364+
365+ start = time .time ()
366+
363367 while True :
364368 if not self .is_connected ():
365369 logging .info ('Wait for a connection' )
@@ -369,7 +373,13 @@ async def _ensure_write(
369373
370374 try :
371375 res = await self ._protocol .write (tp , data , is_bin , timeout )
372- except (CancelledError , NodeError , AuthError ) as e :
376+ except (asyncio .exceptions .CancelledError ,
377+ CancelledError , NodeError , AuthError ) as e :
378+ if timeout and time .time () - start > timeout :
379+ msg = str (e ) or type (e ).__name__
380+ raise asyncio .TimeoutError (
381+ f'failed to transmit within timeout (error: { msg } )' )
382+
373383 logging .error (
374384 f'Failed to transmit package: '
375385 f'{ e } ({ e .__class__ .__name__ } ) (will try again)' )
Original file line number Diff line number Diff line change @@ -222,6 +222,19 @@ def write(
222222 self ._requests [self ._pid ] = (future , task )
223223 return future
224224
225+ def cancel_requests (self ):
226+ if self ._requests :
227+ logging .error (
228+ f'Canceling { len (self ._requests )} requests '
229+ 'due to a lost connection'
230+ )
231+ while self ._requests :
232+ _key , (future , task ) = self ._requests .popitem ()
233+ if task is not None :
234+ task .cancel ()
235+ if not future .cancelled ():
236+ future .cancel ()
237+
225238 @abstractmethod
226239 def _write (self , data : Any ):
227240 ...
@@ -269,18 +282,7 @@ def connection_lost(self, exc: Exception) -> None:
269282 '''
270283 override asyncio.Protocol
271284 '''
272- if self ._requests :
273- logging .error (
274- f'Canceling { len (self ._requests )} requests '
275- 'due to a lost connection'
276- )
277- while self ._requests :
278- _key , (future , task ) = self ._requests .popitem ()
279- if task is not None :
280- task .cancel ()
281- if not future .cancelled ():
282- future .cancel ()
283-
285+ self .cancel_requests ()
284286 self .close_future .set_result (None )
285287 self .close_future = None
286288 self .transport = None
@@ -380,6 +382,7 @@ async def _recv_loop(self):
380382 self ._handle_package (pkg )
381383
382384 except ConnectionClosed as exc :
385+ self .cancel_requests ()
383386 self ._proto = None
384387 self ._on_connection_lost (self , exc )
385388
Original file line number Diff line number Diff line change 1- __version__ = '1.1.4 '
1+ __version__ = '1.1.5 '
You can’t perform that action at this time.
0 commit comments