diff --git a/CHANGES.txt b/CHANGES.txt index a55c958..360392c 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -4,7 +4,7 @@ Changes 0.3.3 (YYYY-MM-DD) ^^^^^^^^^^^^^^^^^^ * Parameter echo passed properly in cursor #185 - +* Close bad connections before returning back to pool #195 0.3.2 (2018-08-04) ^^^^^^^^^^^^^^^^^^ diff --git a/Makefile b/Makefile index 0c7351d..d9308c8 100644 --- a/Makefile +++ b/Makefile @@ -40,6 +40,8 @@ doc: docker_build: make -C ci build +# NOTE: we start crashing if running tests with -n auto + docker_test: docker run --rm -v /$$(pwd):/aioodbc -v /var/run/docker.sock:/var/run/docker.sock --name aioodbc-test-$$(date +%s) --net=host -e PYTHONASYNCIODEBUG=$(PYTHONASYNCIODEBUG) -it jettify/aioodbc-test:latest py.test -sv tests $(FLAGS) diff --git a/aioodbc/connection.py b/aioodbc/connection.py index 32005f4..2304888 100644 --- a/aioodbc/connection.py +++ b/aioodbc/connection.py @@ -6,7 +6,7 @@ import pyodbc from .cursor import Cursor -from .utils import _ContextManager +from .utils import _ContextManager, _is_conn_close_error __all__ = ['connect', 'Connection'] @@ -158,13 +158,18 @@ async def execute(self, sql, *args): by each call, this should not be used if more than one SQL statement needs to be executed. - :param sql: str, formated sql statement + :param sql: str, formatted sql statement :param args: tuple, arguments for construction of sql statement """ - _cursor = await self._execute(self._conn.execute, sql, *args) - connection = self - cursor = Cursor(_cursor, connection, echo=self._echo) - return cursor + try: + _cursor = await self._execute(self._conn.execute, sql, *args) + connection = self + cursor = Cursor(_cursor, connection, echo=self._echo) + return cursor + except pyodbc.Error as e: + if _is_conn_close_error(e): + await self.close() + raise def getinfo(self, type_): """Returns general information about the driver and data source diff --git a/aioodbc/cursor.py b/aioodbc/cursor.py index 82a4970..6d8bb24 100644 --- a/aioodbc/cursor.py +++ b/aioodbc/cursor.py @@ -1,6 +1,6 @@ import pyodbc from .log import logger -from .utils import PY_352 +from .utils import PY_352, _is_conn_close_error __all__ = ['Cursor'] @@ -21,12 +21,18 @@ def __init__(self, pyodbc_cursor, connection, echo=False): self._loop = connection.loop self._echo = echo - def _run_operation(self, func, *args, **kwargs): + async def _run_operation(self, func, *args, **kwargs): # execute func in thread pool of attached to cursor connection if not self._conn: raise pyodbc.OperationalError('Cursor is closed.') - future = self._conn._execute(func, *args, **kwargs) - return future + + try: + result = await self._conn._execute(func, *args, **kwargs) + return result + except pyodbc.Error as e: + if self._conn and _is_conn_close_error(e): + await self._conn.close() + raise @property def echo(self): @@ -118,6 +124,7 @@ async def execute(self, sql, *params): if self._echo: logger.info(sql) logger.info("%r", sql) + await self._run_operation(self._impl.execute, sql, *params) return self diff --git a/aioodbc/utils.py b/aioodbc/utils.py index 8308bee..9ea1ce4 100644 --- a/aioodbc/utils.py +++ b/aioodbc/utils.py @@ -1,9 +1,37 @@ import sys from collections.abc import Coroutine +from pyodbc import Error + PY_352 = sys.version_info >= (3, 5, 2) +# Issue #195. Don't pollute the pool with bad conns +# Unfortunately occasionally sqlite will return 'HY000' for invalid query, +# so we need specialize the check +_CONN_CLOSE_ERRORS = { + # [Microsoft][ODBC Driver 17 for SQL Server]Communication link failure + '08S01': None, + + # [HY000] server closed the connection unexpectedly + 'HY000': '[HY000] server closed the connection unexpectedly', +} + + +def _is_conn_close_error(e): + if not isinstance(e, Error) or len(e.args) < 2: + return False + + sqlstate, msg = e.args[0], e.args[1] + if sqlstate not in _CONN_CLOSE_ERRORS: + return False + + check_msg = _CONN_CLOSE_ERRORS[sqlstate] + if not check_msg: + return True + + return msg.startswith(check_msg) + class _ContextManager(Coroutine): @@ -51,7 +79,7 @@ async def __aenter__(self): async def __aexit__(self, exc_type, exc, tb): if exc_type: - self._obj.rollback() + await self._obj.rollback() elif not self._obj.autocommit: await self._obj.commit() await self._obj.close() diff --git a/ci/Dockerfile b/ci/Dockerfile index f1f36cd..e0cfdb1 100644 --- a/ci/Dockerfile +++ b/ci/Dockerfile @@ -1,4 +1,5 @@ -FROM python:3.6.1-slim +# NOTE: stretch was very crashy when using https://dev.mysql.com/get/Downloads/Connector-ODBC/8.0/mysql-connector-odbc-8.0.16-linux-debian9-x86-64bit.tar.gz +FROM python:3.6.8-slim-jessie # configure apt to install minimal dependencies in non-interactive mode. ENV DEBIAN_FRONTEND noninteractive @@ -23,5 +24,4 @@ RUN rm -rf /aioodbc && \ apt-get purge -y wget && \ apt-get autoremove -y -VOLUME /aioodbc WORKDIR /aioodbc diff --git a/requirements-dev.txt b/requirements-dev.txt index c6c13ba..2f37802 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -14,11 +14,13 @@ flake8-pyi==18.3.1 flake8-tuple==0.2.13 ipdb==0.11 ipython==7.2.0 -pyodbc==4.0.25 -pytest==4.0.2 -pytest-asyncio==0.9.0 -pytest-cov==2.6.0 +pyodbc==4.0.26 +pytest==4.6.4 +pytest-asyncio==0.10.0 +pytest-cov==2.7.1 pytest-sugar==0.9.2 +pytest-faulthandler==1.6.0 sphinx==1.8.3 sphinxcontrib-asyncio==0.2.0 -uvloop==0.11.3 +uvloop==0.12.2 +async_generator==1.10 \ No newline at end of file diff --git a/tests/conftest.py b/tests/conftest.py index 81b0ece..37688e3 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,6 +1,7 @@ import asyncio import gc import os +import random import time import uuid from concurrent.futures import ThreadPoolExecutor @@ -11,6 +12,7 @@ import uvloop from aiodocker import Docker +from async_generator import asynccontextmanager @pytest.fixture(scope='session') @@ -32,9 +34,12 @@ def event_loop(loop_type): elif loop_type == 'uvloop': asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) loop = asyncio.get_event_loop_policy().new_event_loop() - yield loop - gc.collect() - loop.close() + + try: + yield loop + finally: + gc.collect() + loop.close() # alias @@ -46,23 +51,27 @@ def loop(event_loop): @pytest.fixture(scope='session') async def docker(loop): client = Docker() - yield client - await client.close() + + try: + yield client + finally: + await client.close() @pytest.fixture(scope='session') def host(): + # Alternative: host.docker.internal, however not working on travis return os.environ.get('DOCKER_MACHINE_IP', '127.0.0.1') @pytest.fixture async def pg_params(loop, pg_server): - server_info = (pg_server)['pg_params'] + server_info = pg_server['pg_params'] return dict(**server_info) -@pytest.fixture(scope='session') -async def pg_server(loop, host, docker, session_id): +@asynccontextmanager +async def _pg_server_helper(host, docker, session_id): pg_tag = '9.5' await docker.pull('postgres:{}'.format(pg_tag)) @@ -78,41 +87,58 @@ async def pg_server(loop, host, docker, session_id): } ) await container.start() - port = (await container.port(5432))[0]['HostPort'] + container_port = await container.port(5432) + port = container_port[0]['HostPort'] pg_params = { 'database': 'postgres', 'user': 'postgres', 'password': 'mysecretpassword', 'host': host, - 'port': port + 'port': port, } - delay = 0.001 + + start = time.time() dsn = create_pg_dsn(pg_params) last_error = None - for _ in range(100): - try: - conn = pyodbc.connect(dsn) - cur = conn.cursor() - cur.execute("SELECT 1;") - cur.close() - conn.close() - break - except pyodbc.Error as e: - last_error = e - time.sleep(delay) - delay *= 2 - else: - pytest.fail("Cannot start postgres server: {}".format(last_error)) - container_info = { 'port': port, 'pg_params': pg_params, + 'container': container, + 'dsn': dsn, } - yield container_info + try: + while (time.time() - start) < 40: + try: + conn = pyodbc.connect(dsn) + cur = conn.execute("SELECT 1;") + cur.close() + conn.close() + break + except pyodbc.Error as e: + last_error = e + await asyncio.sleep(random.uniform(0.1, 1)) + else: + pytest.fail("Cannot start postgres server: {}".format(last_error)) - await container.kill() - await container.delete(force=True) + yield container_info + finally: + container = container_info['container'] + if container: + await container.kill() + await container.delete(v=True, force=True) + + +@pytest.fixture(scope='session') +async def pg_server(loop, host, docker, session_id): + async with _pg_server_helper(host, docker, session_id) as helper: + yield helper + + +@pytest.fixture +async def pg_server_local(loop, host, docker): + async with _pg_server_helper(host, docker, None) as helper: + yield helper @pytest.fixture @@ -149,42 +175,46 @@ async def mysql_server(loop, host, docker, session_id): 'host': host, 'port': port } - delay = 0.001 dsn = create_mysql_dsn(mysql_params) - last_error = None - for _ in range(100): - try: - conn = pyodbc.connect(dsn) - cur = conn.cursor() - cur.execute("SELECT 1;") - cur.close() - conn.close() - break - except pyodbc.Error as e: - last_error = e - time.sleep(delay) - delay *= 2 - else: - pytest.fail("Cannot start mysql server: {}".format(last_error)) - container_info = { - 'port': port, - 'mysql_params': mysql_params, - } - yield container_info + start = time.time() + try: + last_error = None + while (time.time() - start) < 30: + try: + conn = pyodbc.connect(dsn) + cur = conn.execute("SELECT 1;") + cur.close() + conn.close() + break + except pyodbc.Error as e: + last_error = e + await asyncio.sleep(random.uniform(0.1, 1)) + else: + pytest.fail("Cannot start mysql server: {}".format(last_error)) - await container.kill() - await container.delete(force=True) + container_info = { + 'port': port, + 'mysql_params': mysql_params, + } + + yield container_info + finally: + await container.kill() + await container.delete(v=True, force=True) @pytest.fixture def executor(): executor = ThreadPoolExecutor(max_workers=1) - yield executor - executor.shutdown() + + try: + yield executor + finally: + executor.shutdown(True) -def pytest_namespace(): - return {'db_list': ['pg', 'mysql', 'sqlite']} +def pytest_configure(): + pytest.db_list = ['pg', 'mysql', 'sqlite'] @pytest.fixture @@ -208,7 +238,7 @@ def create_mysql_dsn(mysql_params): @pytest.fixture -def dsn(request, db): +def dsn(tmp_path, request, db): if db == 'pg': pg_params = request.getfixturevalue('pg_params') conf = create_pg_dsn(pg_params) @@ -216,14 +246,16 @@ def dsn(request, db): mysql_params = request.getfixturevalue('mysql_params') conf = create_mysql_dsn(mysql_params) else: - conf = os.environ.get('DSN', 'Driver=SQLite;Database=sqlite.db') + conf = os.environ.get( + 'DSN', f'Driver=SQLite;Database={tmp_path / "sqlite.db"}') + return conf @pytest.fixture async def conn(loop, dsn, connection_maker): connection = await connection_maker() - yield connection + return connection @pytest.fixture @@ -241,21 +273,23 @@ async def make(**kw): cleanup.append((conn, executor)) return conn - yield make - - for conn, executor in cleanup: - await conn.close() - executor.shutdown() + try: + yield make + finally: + for conn, executor in cleanup: + await conn.close() + executor.shutdown(True) @pytest.fixture async def pool(loop, dsn): pool = await aioodbc.create_pool(loop=loop, dsn=dsn) - yield pool - - pool.close() - await pool.wait_closed() + try: + yield pool + finally: + pool.close() + await pool.wait_closed() @pytest.fixture @@ -267,11 +301,12 @@ async def make(loop, **kw): pool_list.append(pool) return pool - yield make - - for pool in pool_list: - pool.close() - await pool.wait_closed() + try: + yield make + finally: + for pool in pool_list: + pool.close() + await pool.wait_closed() @pytest.fixture @@ -284,9 +319,10 @@ async def table(loop, conn): await conn.commit() await cur.close() - yield 't1' - - cur = await conn.cursor() - await cur.execute("DROP TABLE t1;") - await cur.commit() - await cur.close() + try: + yield 't1' + finally: + cur = await conn.cursor() + await cur.execute("DROP TABLE t1;") + await cur.commit() + await cur.close() diff --git a/tests/test_pool.py b/tests/test_pool.py index 2bf628d..33b9912 100644 --- a/tests/test_pool.py +++ b/tests/test_pool.py @@ -1,4 +1,5 @@ import asyncio +import time import pytest import aioodbc @@ -54,6 +55,30 @@ async def test_release(pool): assert not pool._used +@pytest.mark.asyncio +async def test_op_error_release(loop, pool_maker, pg_server_local): + pool = await pool_maker(loop, dsn=pg_server_local['dsn'], autocommit=True) + + with pytest.raises(Error): + async with pool.acquire() as conn: + async def execute(): + start = time.time() + + while time.time() - start < 20: + await conn.execute('SELECT 1; SELECT pg_sleep(1);') + + async def _kill_conn(): + await asyncio.sleep(2) + await pg_server_local['container'].kill() + await pg_server_local['container'].delete(v=True, force=True) + pg_server_local['container'] = None + + await asyncio.gather(_kill_conn(), execute()) + + assert 9 == pool.freesize + assert not pool._used + + @pytest.mark.asyncio async def test_release_closed(pool): conn = await pool.acquire()