From eaae8c050a0452d3569b17f53d12d3f0d535064f Mon Sep 17 00:00:00 2001 From: egalpin Date: Tue, 30 Jan 2018 11:13:18 -0500 Subject: [PATCH 1/3] Adds consecutive exception tracker to connections --- asyncpg/connect_utils.py | 5 ++++- asyncpg/connection.py | 26 ++++++++++++++++++++++++-- 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/asyncpg/connect_utils.py b/asyncpg/connect_utils.py index da616523..20b1acbd 100644 --- a/asyncpg/connect_utils.py +++ b/asyncpg/connect_utils.py @@ -38,6 +38,7 @@ 'statement_cache_size', 'max_cached_statement_lifetime', 'max_cacheable_statement_size', + 'max_consecutive_exceptions', ]) @@ -210,6 +211,7 @@ def _parse_connect_arguments(*, dsn, host, port, user, password, database, timeout, command_timeout, statement_cache_size, max_cached_statement_lifetime, max_cacheable_statement_size, + max_consecutive_exceptions, ssl, server_settings): local_vars = locals() @@ -245,7 +247,8 @@ def _parse_connect_arguments(*, dsn, host, port, user, password, database, command_timeout=command_timeout, statement_cache_size=statement_cache_size, max_cached_statement_lifetime=max_cached_statement_lifetime, - max_cacheable_statement_size=max_cacheable_statement_size,) + max_cacheable_statement_size=max_cacheable_statement_size, + max_consecutive_exceptions=max_consecutive_exceptions,) return addrs, params, config diff --git a/asyncpg/connection.py b/asyncpg/connection.py index 9efd5233..3b58ff6c 100644 --- a/asyncpg/connection.py +++ b/asyncpg/connection.py @@ -44,7 +44,7 @@ class Connection(metaclass=ConnectionMeta): '_listeners', '_server_version', '_server_caps', '_intro_query', '_reset_query', '_proxy', '_stmt_exclusive_section', '_config', '_params', '_addr', - '_log_listeners', '_cancellations') + '_log_listeners', '_cancellations', '_consecutive_exceptions') def __init__(self, protocol, transport, loop, addr: (str, int) or str, @@ -97,6 +97,7 @@ def __init__(self, protocol, transport, loop, # Used for `con.fetchval()`, `con.fetch()`, `con.fetchrow()`, # `con.execute()`, and `con.executemany()`. self._stmt_exclusive_section = _Atomic() + self._consecutive_exceptions = 0 async def add_listener(self, channel, callback): """Add a listener for Postgres notifications. @@ -1331,6 +1332,8 @@ async def _do_execute(self, query, executor, timeout, retry=True): # It is not possible to recover (the statement is already done at # the server's side), the only way is to drop our caches and # reraise the exception to the caller. + # + await self._maybe_close_bad_connection() await self.reload_schema_state() raise except exceptions.InvalidCachedStatementError: @@ -1356,15 +1359,27 @@ async def _do_execute(self, query, executor, timeout, retry=True): # and https://github.com/MagicStack/asyncpg/issues/76 # for discussion. # + await self._maybe_close_bad_connection() self._drop_global_statement_cache() if self._protocol.is_in_transaction() or not retry: raise else: return await self._do_execute( query, executor, timeout, retry=False) + except: + logging.warning('exception - maybe closing bad connection') + await self._maybe_close_bad_connection() + raise + self._consecutive_exceptions = 0 return result, stmt + async def _maybe_close_bad_connection(self): + self._consecutive_exceptions += 1 + if self._consecutive_exceptions > \ + self._config.max_consecutive_exceptions: + await self.close() + async def connect(dsn=None, *, host=None, port=None, @@ -1375,6 +1390,7 @@ async def connect(dsn=None, *, statement_cache_size=100, max_cached_statement_lifetime=300, max_cacheable_statement_size=1024 * 15, + max_consecutive_exceptions=5, command_timeout=None, ssl=None, connection_class=Connection, @@ -1431,6 +1447,11 @@ async def connect(dsn=None, *, default). Pass ``0`` to allow all statements to be cached regardless of their size. + :param int max_consecutive_exceptions: + the maximum number of consecutive exceptions that may be raised by a + single connection before that connection is assumed corrupt (ex. + pointing to an old DB after a failover) + :param float command_timeout: the default timeout for operations on this connection (the default is ``None``: no timeout). @@ -1495,7 +1516,8 @@ class of the returned connection object. Must be a subclass of command_timeout=command_timeout, statement_cache_size=statement_cache_size, max_cached_statement_lifetime=max_cached_statement_lifetime, - max_cacheable_statement_size=max_cacheable_statement_size) + max_cacheable_statement_size=max_cacheable_statement_size, + max_consecutive_exceptions=max_consecutive_exceptions) class _StatementCacheEntry: From 8da94e3c1b7e998ef5b388425b7adfb702185f78 Mon Sep 17 00:00:00 2001 From: egalpin Date: Wed, 31 Jan 2018 15:00:24 -0500 Subject: [PATCH 2/3] Removes errant logging and close calls --- asyncpg/connection.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/asyncpg/connection.py b/asyncpg/connection.py index 3b58ff6c..bb2165cf 100644 --- a/asyncpg/connection.py +++ b/asyncpg/connection.py @@ -1333,7 +1333,6 @@ async def _do_execute(self, query, executor, timeout, retry=True): # the server's side), the only way is to drop our caches and # reraise the exception to the caller. # - await self._maybe_close_bad_connection() await self.reload_schema_state() raise except exceptions.InvalidCachedStatementError: @@ -1359,7 +1358,6 @@ async def _do_execute(self, query, executor, timeout, retry=True): # and https://github.com/MagicStack/asyncpg/issues/76 # for discussion. # - await self._maybe_close_bad_connection() self._drop_global_statement_cache() if self._protocol.is_in_transaction() or not retry: raise @@ -1367,7 +1365,6 @@ async def _do_execute(self, query, executor, timeout, retry=True): return await self._do_execute( query, executor, timeout, retry=False) except: - logging.warning('exception - maybe closing bad connection') await self._maybe_close_bad_connection() raise From 79317dcc159665f52dabe542d9abc01539182c09 Mon Sep 17 00:00:00 2001 From: egalpin Date: Wed, 31 Jan 2018 15:06:37 -0500 Subject: [PATCH 3/3] Adds setting to disable max_consecutive_exceptions --- asyncpg/connection.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/asyncpg/connection.py b/asyncpg/connection.py index bb2165cf..bf697b0b 100644 --- a/asyncpg/connection.py +++ b/asyncpg/connection.py @@ -1372,10 +1372,12 @@ async def _do_execute(self, query, executor, timeout, retry=True): return result, stmt async def _maybe_close_bad_connection(self): - self._consecutive_exceptions += 1 - if self._consecutive_exceptions > \ - self._config.max_consecutive_exceptions: - await self.close() + if self._config.max_consecutive_exceptions > 0: + self._consecutive_exceptions += 1 + + if self._consecutive_exceptions > \ + self._config.max_consecutive_exceptions: + await self.close() async def connect(dsn=None, *, @@ -1387,7 +1389,7 @@ async def connect(dsn=None, *, statement_cache_size=100, max_cached_statement_lifetime=300, max_cacheable_statement_size=1024 * 15, - max_consecutive_exceptions=5, + max_consecutive_exceptions=0, command_timeout=None, ssl=None, connection_class=Connection, @@ -1447,7 +1449,7 @@ async def connect(dsn=None, *, :param int max_consecutive_exceptions: the maximum number of consecutive exceptions that may be raised by a single connection before that connection is assumed corrupt (ex. - pointing to an old DB after a failover) + pointing to an old DB after a failover). Pass ``0`` to disable. :param float command_timeout: the default timeout for operations on this connection