diff --git a/packages/pytest-simcore/src/pytest_simcore/helpers/logging_tools.py b/packages/pytest-simcore/src/pytest_simcore/helpers/logging_tools.py index 2bb29562d75..807d9f6b4b7 100644 --- a/packages/pytest-simcore/src/pytest_simcore/helpers/logging_tools.py +++ b/packages/pytest-simcore/src/pytest_simcore/helpers/logging_tools.py @@ -22,7 +22,7 @@ def _timedelta_as_minute_second_ms(delta: datetime.timedelta) -> str: if int(milliseconds * 1000) != 0: result += f"{int(milliseconds*1000)}ms" - sign = "-" if total_seconds < 0 else "" + sign = "-" if total_seconds < 0 else "<1ms" return f"{sign}{result.strip()}" @@ -32,10 +32,11 @@ class DynamicIndentFormatter(logging.Formatter): _cls_indent_level: int = 0 _instance_indent_level: int = 0 - def __init__(self, fmt=None, datefmt=None, style="%"): + def __init__(self, *args, **kwargs): + fmt = args[0] if args else None dynamic_fmt = fmt or "%(asctime)s %(levelname)s %(message)s" assert "message" in dynamic_fmt - super().__init__(dynamic_fmt, datefmt, style) + super().__init__(dynamic_fmt, *args, **kwargs) def format(self, record) -> str: original_message = record.msg diff --git a/packages/service-library/src/servicelib/aiohttp/db_asyncpg_engine.py b/packages/service-library/src/servicelib/aiohttp/db_asyncpg_engine.py index 1163a479c68..88b0338dadf 100644 --- a/packages/service-library/src/servicelib/aiohttp/db_asyncpg_engine.py +++ b/packages/service-library/src/servicelib/aiohttp/db_asyncpg_engine.py @@ -4,7 +4,6 @@ SEE migration aiopg->asyncpg https://github.com/ITISFoundation/osparc-simcore/issues/4529 """ - import logging from typing import Final @@ -16,7 +15,7 @@ ) from sqlalchemy.ext.asyncio import AsyncEngine -from ..db_asyncpg_utils import create_async_engine_and_pg_database_ready +from ..db_asyncpg_utils import create_async_engine_and_database_ready from ..logging_utils import log_context APP_DB_ASYNC_ENGINE_KEY: Final[str] = f"{__name__ }.AsyncEngine" @@ -56,7 +55,7 @@ async def connect_to_db(app: web.Application, settings: PostgresSettings) -> Non "Connecting app[APP_DB_ASYNC_ENGINE_KEY] to postgres with %s", f"{settings=}", ): - engine = await create_async_engine_and_pg_database_ready(settings) + engine = await create_async_engine_and_database_ready(settings) _set_async_engine_to_app_state(app, engine) _logger.info( diff --git a/packages/service-library/src/servicelib/db_async_engine.py b/packages/service-library/src/servicelib/db_async_engine.py index cff73e77047..dd9166e46fa 100644 --- a/packages/service-library/src/servicelib/db_async_engine.py +++ b/packages/service-library/src/servicelib/db_async_engine.py @@ -1,4 +1,5 @@ import logging +import warnings from fastapi import FastAPI from settings_library.postgres import PostgresSettings @@ -17,6 +18,12 @@ @retry(**PostgresRetryPolicyUponInitialization(_logger).kwargs) async def connect_to_db(app: FastAPI, settings: PostgresSettings) -> None: + warnings.warn( + "The 'connect_to_db' function is deprecated and will be removed in a future release. " + "Please use 'postgres_lifespan' instead for managing the database connection lifecycle.", + DeprecationWarning, + stacklevel=2, + ) with log_context( _logger, logging.DEBUG, f"connection to db {settings.dsn_with_async_sqlalchemy}" ): diff --git a/packages/service-library/src/servicelib/db_asyncpg_utils.py b/packages/service-library/src/servicelib/db_asyncpg_utils.py index 84430916824..4abbcd3ac66 100644 --- a/packages/service-library/src/servicelib/db_asyncpg_utils.py +++ b/packages/service-library/src/servicelib/db_asyncpg_utils.py @@ -4,9 +4,6 @@ from models_library.healthchecks import IsNonResponsive, IsResponsive, LivenessResult from settings_library.postgres import PostgresSettings -from simcore_postgres_database.utils_aiosqlalchemy import ( # type: ignore[import-not-found] # this on is unclear - raise_if_migration_not_ready, -) from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine from tenacity import retry @@ -17,7 +14,7 @@ @retry(**PostgresRetryPolicyUponInitialization(_logger).kwargs) -async def create_async_engine_and_pg_database_ready( +async def create_async_engine_and_database_ready( settings: PostgresSettings, ) -> AsyncEngine: """ @@ -26,13 +23,17 @@ async def create_async_engine_and_pg_database_ready( - waits until db data is migrated (i.e. ready to use) - returns engine """ + from simcore_postgres_database.utils_aiosqlalchemy import ( # type: ignore[import-not-found] # this on is unclear + raise_if_migration_not_ready, + ) + server_settings = None if settings.POSTGRES_CLIENT_NAME: server_settings = { "application_name": settings.POSTGRES_CLIENT_NAME, } - engine: AsyncEngine = create_async_engine( + engine = create_async_engine( settings.dsn_with_async_sqlalchemy, pool_size=settings.POSTGRES_MINSIZE, max_overflow=settings.POSTGRES_MAXSIZE - settings.POSTGRES_MINSIZE, @@ -43,9 +44,10 @@ async def create_async_engine_and_pg_database_ready( try: await raise_if_migration_not_ready(engine) - except Exception: + except Exception as exc: # NOTE: engine must be closed because retry will create a new engine await engine.dispose() + exc.add_note("Failed during migration check. Created engine disposed.") raise return engine diff --git a/packages/service-library/src/servicelib/fastapi/db_asyncpg_engine.py b/packages/service-library/src/servicelib/fastapi/db_asyncpg_engine.py index 920f68008ae..8f472dc9b51 100644 --- a/packages/service-library/src/servicelib/fastapi/db_asyncpg_engine.py +++ b/packages/service-library/src/servicelib/fastapi/db_asyncpg_engine.py @@ -1,4 +1,5 @@ import logging +import warnings from fastapi import FastAPI from settings_library.postgres import PostgresSettings @@ -7,19 +8,26 @@ ) from sqlalchemy.ext.asyncio import AsyncEngine -from ..db_asyncpg_utils import create_async_engine_and_pg_database_ready +from ..db_asyncpg_utils import create_async_engine_and_database_ready from ..logging_utils import log_context _logger = logging.getLogger(__name__) async def connect_to_db(app: FastAPI, settings: PostgresSettings) -> None: + warnings.warn( + "The 'connect_to_db' function is deprecated and will be removed in a future release. " + "Please use 'postgres_lifespan' instead for managing the database connection lifecycle.", + DeprecationWarning, + stacklevel=2, + ) + with log_context( _logger, logging.DEBUG, f"Connecting and migraging {settings.dsn_with_async_sqlalchemy}", ): - engine = await create_async_engine_and_pg_database_ready(settings) + engine = await create_async_engine_and_database_ready(settings) app.state.engine = engine _logger.debug( diff --git a/packages/service-library/src/servicelib/fastapi/lifespan_utils.py b/packages/service-library/src/servicelib/fastapi/lifespan_utils.py index ee2808078e6..05d70104a17 100644 --- a/packages/service-library/src/servicelib/fastapi/lifespan_utils.py +++ b/packages/service-library/src/servicelib/fastapi/lifespan_utils.py @@ -1,13 +1,24 @@ from collections.abc import AsyncIterator from typing import Protocol +from common_library.errors_classes import OsparcErrorMixin from fastapi import FastAPI from fastapi_lifespan_manager import LifespanManager, State +class LifespanError(OsparcErrorMixin, RuntimeError): ... + + +class LifespanOnStartupError(LifespanError): + msg_template = "Failed during startup of {module}" + + +class LifespanOnShutdownError(LifespanError): + msg_template = "Failed during shutdown of {module}" + + class LifespanGenerator(Protocol): - def __call__(self, app: FastAPI) -> AsyncIterator["State"]: - ... + def __call__(self, app: FastAPI) -> AsyncIterator["State"]: ... def combine_lifespans(*generators: LifespanGenerator) -> LifespanManager: diff --git a/packages/service-library/src/servicelib/fastapi/postgres_lifespan.py b/packages/service-library/src/servicelib/fastapi/postgres_lifespan.py new file mode 100644 index 00000000000..def76edd62a --- /dev/null +++ b/packages/service-library/src/servicelib/fastapi/postgres_lifespan.py @@ -0,0 +1,54 @@ +import asyncio +import logging +from collections.abc import AsyncIterator +from enum import Enum + +from fastapi_lifespan_manager import State +from servicelib.logging_utils import log_catch, log_context +from settings_library.postgres import PostgresSettings +from sqlalchemy.ext.asyncio import AsyncEngine + +from ..db_asyncpg_utils import create_async_engine_and_database_ready +from .lifespan_utils import LifespanOnStartupError + +_logger = logging.getLogger(__name__) + + +class PostgresLifespanState(str, Enum): + POSTGRES_SETTINGS = "postgres_settings" + POSTGRES_ASYNC_ENGINE = "postgres.async_engine" + + +class PostgresConfigurationError(LifespanOnStartupError): + msg_template = "Invalid postgres settings [={settings}] on startup. Note that postgres cannot be disabled using settings" + + +def create_input_state(settings: PostgresSettings) -> State: + return {PostgresLifespanState.POSTGRES_SETTINGS: settings} + + +async def postgres_database_lifespan(_, state: State) -> AsyncIterator[State]: + + with log_context(_logger, logging.INFO, f"{__name__}"): + + settings = state[PostgresLifespanState.POSTGRES_SETTINGS] + + if settings is None or not isinstance(settings, PostgresSettings): + raise PostgresConfigurationError(settings=settings) + + assert isinstance(settings, PostgresSettings) # nosec + + # connect to database + async_engine: AsyncEngine = await create_async_engine_and_database_ready( + settings + ) + + try: + + yield { + PostgresLifespanState.POSTGRES_ASYNC_ENGINE: async_engine, + } + + finally: + with log_catch(_logger, reraise=False): + await asyncio.wait_for(async_engine.dispose(), timeout=10) diff --git a/packages/service-library/tests/fastapi/test_lifespan_utils.py b/packages/service-library/tests/fastapi/test_lifespan_utils.py index b3619815db8..a89b64603f2 100644 --- a/packages/service-library/tests/fastapi/test_lifespan_utils.py +++ b/packages/service-library/tests/fastapi/test_lifespan_utils.py @@ -1,10 +1,25 @@ +# pylint: disable=protected-access +# pylint: disable=redefined-outer-name +# pylint: disable=too-many-arguments +# pylint: disable=unused-argument +# pylint: disable=unused-variable + + +import logging from collections.abc import AsyncIterator +from typing import Any -import asgi_lifespan import pytest +from asgi_lifespan import LifespanManager as ASGILifespanManager from fastapi import FastAPI -from fastapi_lifespan_manager import State -from servicelib.fastapi.lifespan_utils import combine_lifespans +from fastapi_lifespan_manager import LifespanManager, State +from pytest_mock import MockerFixture +from pytest_simcore.helpers.logging_tools import log_context +from servicelib.fastapi.lifespan_utils import ( + LifespanOnShutdownError, + LifespanOnStartupError, + combine_lifespans, +) async def test_multiple_lifespan_managers(capsys: pytest.CaptureFixture): @@ -24,7 +39,7 @@ async def cache_lifespan(app: FastAPI) -> AsyncIterator[State]: capsys.readouterr() - async with asgi_lifespan.LifespanManager(app): + async with ASGILifespanManager(app): messages = capsys.readouterr().out assert "setup DB" in messages @@ -38,3 +53,201 @@ async def cache_lifespan(app: FastAPI) -> AsyncIterator[State]: assert "setup CACHE" not in messages assert "shutdown DB" in messages assert "shutdown CACHE" in messages + + +@pytest.fixture +def postgres_lifespan() -> LifespanManager: + lifespan_manager = LifespanManager() + + @lifespan_manager.add + async def _setup_postgres_sync_engine(_) -> AsyncIterator[State]: + with log_context(logging.INFO, "postgres_sync_engine"): + # pass state to children + yield {"postgres": {"engine": "Some Engine"}} + + @lifespan_manager.add + async def _setup_postgres_async_engine(_, state: State) -> AsyncIterator[State]: + with log_context(logging.INFO, "postgres_async_engine"): + # pass state to children + + current = state["postgres"] + yield {"postgres": {"aengine": "Some Async Engine", **current}} + + return lifespan_manager + + +@pytest.fixture +def rabbitmq_lifespan() -> LifespanManager: + lifespan_manager = LifespanManager() + + @lifespan_manager.add + async def _setup_rabbitmq(app: FastAPI) -> AsyncIterator[State]: + with log_context(logging.INFO, "rabbitmq"): + + with pytest.raises(AttributeError, match="rabbitmq_rpc_server"): + _ = app.state.rabbitmq_rpc_server + + # pass state to children + yield {"rabbitmq_rpc_server": "Some RabbitMQ RPC Server"} + + return lifespan_manager + + +async def test_app_lifespan_composition( + postgres_lifespan: LifespanManager, rabbitmq_lifespan: LifespanManager +): + # The app has its own database and rpc-server to initialize + # this is how you connect the lifespans pre-defined in servicelib + + @postgres_lifespan.add + async def database_lifespan(app: FastAPI, state: State) -> AsyncIterator[State]: + + with log_context(logging.INFO, "app database"): + assert state["postgres"] == { + "engine": "Some Engine", + "aengine": "Some Async Engine", + } + + with pytest.raises(AttributeError, match="database_engine"): + _ = app.state.database_engine + + app.state.database_engine = state["postgres"]["engine"] + + yield {} # no update + + # tear-down stage + assert app.state.database_engine + + @rabbitmq_lifespan.add + async def rpc_service_lifespan(app: FastAPI, state: State) -> AsyncIterator[State]: + with log_context(logging.INFO, "app rpc-server"): + assert "rabbitmq_rpc_server" in state + + app.state.rpc_server = state["rabbitmq_rpc_server"] + + yield {} + + # Composes lifepans + app_lifespan = LifespanManager() + app_lifespan.include(postgres_lifespan) + app_lifespan.include(rabbitmq_lifespan) + + app = FastAPI(lifespan=app_lifespan) + async with ASGILifespanManager(app) as asgi_manager: + + # asgi_manage state + assert asgi_manager._state == { # noqa: SLF001 + "postgres": { + "engine": "Some Engine", + "aengine": "Some Async Engine", + }, + "rabbitmq_rpc_server": "Some RabbitMQ RPC Server", + } + + # app state + assert app.state.database_engine + assert app.state.rpc_server + + # NOTE: these are different states! + assert app.state._state != asgi_manager._state # noqa: SLF001 + + # Logs shows lifespan execution: + # -> postgres_sync_engine starting ... + # -> postgres_async_engine starting ... + # -> app database starting ... + # -> rabbitmq starting ... + # -> app rpc-server starting ... + # <- app rpc-server done (<1ms) + # <- rabbitmq done (<1ms) + # <- app database done (1ms) + # <- postgres_async_engine done (1ms) + # <- postgres_sync_engine done (1ms) + + +@pytest.fixture +def failing_lifespan_manager(mocker: MockerFixture) -> dict[str, Any]: + startup_step = mocker.MagicMock() + shutdown_step = mocker.MagicMock() + handle_error = mocker.MagicMock() + + def raise_error(): + msg = "failing module" + raise RuntimeError(msg) + + async def lifespan_failing_on_startup(app: FastAPI) -> AsyncIterator[State]: + _name = lifespan_failing_on_startup.__name__ + + with log_context(logging.INFO, _name): + try: + raise_error() + startup_step(_name) + except RuntimeError as exc: + handle_error(_name, exc) + raise LifespanOnStartupError(module=_name) from exc + yield {} + shutdown_step(_name) + + async def lifespan_failing_on_shutdown(app: FastAPI) -> AsyncIterator[State]: + _name = lifespan_failing_on_shutdown.__name__ + + with log_context(logging.INFO, _name): + startup_step(_name) + yield {} + try: + raise_error() + shutdown_step(_name) + except RuntimeError as exc: + handle_error(_name, exc) + raise LifespanOnShutdownError(module=_name) from exc + + return { + "startup_step": startup_step, + "shutdown_step": shutdown_step, + "handle_error": handle_error, + "lifespan_failing_on_startup": lifespan_failing_on_startup, + "lifespan_failing_on_shutdown": lifespan_failing_on_shutdown, + } + + +async def test_app_lifespan_with_error_on_startup( + failing_lifespan_manager: dict[str, Any], +): + app_lifespan = LifespanManager() + app_lifespan.add(failing_lifespan_manager["lifespan_failing_on_startup"]) + app = FastAPI(lifespan=app_lifespan) + + with pytest.raises(LifespanOnStartupError) as err_info: + async with ASGILifespanManager(app): + ... + + exception = err_info.value + assert failing_lifespan_manager["handle_error"].called + assert not failing_lifespan_manager["startup_step"].called + assert not failing_lifespan_manager["shutdown_step"].called + assert exception.error_context() == { + "module": "lifespan_failing_on_startup", + "message": "Failed during startup of lifespan_failing_on_startup", + "code": "RuntimeError.LifespanError.LifespanOnStartupError", + } + + +async def test_app_lifespan_with_error_on_shutdown( + failing_lifespan_manager: dict[str, Any], +): + app_lifespan = LifespanManager() + app_lifespan.add(failing_lifespan_manager["lifespan_failing_on_shutdown"]) + app = FastAPI(lifespan=app_lifespan) + + with pytest.raises(LifespanOnShutdownError) as err_info: + async with ASGILifespanManager(app): + ... + + exception = err_info.value + assert failing_lifespan_manager["handle_error"].called + assert failing_lifespan_manager["startup_step"].called + assert not failing_lifespan_manager["shutdown_step"].called + assert exception.error_context() == { + "module": "lifespan_failing_on_shutdown", + "message": "Failed during shutdown of lifespan_failing_on_shutdown", + "code": "RuntimeError.LifespanError.LifespanOnShutdownError", + } diff --git a/packages/service-library/tests/fastapi/test_postgres_lifespan.py b/packages/service-library/tests/fastapi/test_postgres_lifespan.py new file mode 100644 index 00000000000..0c656c37187 --- /dev/null +++ b/packages/service-library/tests/fastapi/test_postgres_lifespan.py @@ -0,0 +1,172 @@ +# pylint: disable=protected-access +# pylint: disable=redefined-outer-name +# pylint: disable=too-many-arguments +# pylint: disable=unused-argument +# pylint: disable=unused-variable + +from collections.abc import AsyncIterator +from typing import Annotated, Any + +import pytest +import servicelib.fastapi.postgres_lifespan +from asgi_lifespan import LifespanManager as ASGILifespanManager +from fastapi import FastAPI +from fastapi_lifespan_manager import LifespanManager, State +from pydantic import Field +from pytest_mock import MockerFixture, MockType +from pytest_simcore.helpers.monkeypatch_envs import setenvs_from_dict +from pytest_simcore.helpers.typing_env import EnvVarsDict +from servicelib.fastapi.postgres_lifespan import ( + PostgresConfigurationError, + PostgresLifespanState, + postgres_database_lifespan, +) +from settings_library.application import BaseApplicationSettings +from settings_library.postgres import PostgresSettings + + +@pytest.fixture +def mock_create_async_engine_and_database_ready(mocker: MockerFixture) -> MockType: + return mocker.patch.object( + servicelib.fastapi.postgres_lifespan, + "create_async_engine_and_database_ready", + return_value=mocker.AsyncMock(), + ) + + +@pytest.fixture +def app_environment(monkeypatch: pytest.MonkeyPatch) -> EnvVarsDict: + return setenvs_from_dict( + monkeypatch, PostgresSettings.model_json_schema()["examples"][0] + ) + + +@pytest.fixture +def app_lifespan( + app_environment: EnvVarsDict, + mock_create_async_engine_and_database_ready: MockType, +) -> LifespanManager: + assert app_environment + + class AppSettings(BaseApplicationSettings): + CATALOG_POSTGRES: Annotated[ + PostgresSettings, + Field(json_schema_extra={"auto_default_from_env": True}), + ] + + async def my_app_settings(app: FastAPI) -> AsyncIterator[State]: + app.state.settings = AppSettings.create_from_envs() + + yield { + PostgresLifespanState.POSTGRES_SETTINGS: app.state.settings.CATALOG_POSTGRES + } + + async def my_database_setup(app: FastAPI, state: State) -> AsyncIterator[State]: + app.state.my_db_engine = state[PostgresLifespanState.POSTGRES_ASYNC_ENGINE] + + yield {} + + # compose lifespans + app_lifespan = LifespanManager() + app_lifespan.add(my_app_settings) + + # potsgres + app_lifespan.add(postgres_database_lifespan) + app_lifespan.add(my_database_setup) + + return app_lifespan + + +async def test_lifespan_postgres_database_in_an_app( + is_pdb_enabled: bool, + app_environment: EnvVarsDict, + mock_create_async_engine_and_database_ready: MockType, + app_lifespan: LifespanManager, +): + + app = FastAPI(lifespan=app_lifespan) + + async with ASGILifespanManager( + app, + startup_timeout=None if is_pdb_enabled else 10, + shutdown_timeout=None if is_pdb_enabled else 10, + ) as asgi_manager: + # Verify that the async engine was created + mock_create_async_engine_and_database_ready.assert_called_once_with( + app.state.settings.CATALOG_POSTGRES + ) + + # Verify that the async engine is in the lifespan manager state + assert ( + PostgresLifespanState.POSTGRES_ASYNC_ENGINE + in asgi_manager._state # noqa: SLF001 + ) + assert app.state.my_db_engine + assert ( + app.state.my_db_engine + == asgi_manager._state[ # noqa: SLF001 + PostgresLifespanState.POSTGRES_ASYNC_ENGINE + ] + ) + + assert ( + app.state.my_db_engine + == mock_create_async_engine_and_database_ready.return_value + ) + + # Verify that the engine was disposed + async_engine: Any = mock_create_async_engine_and_database_ready.return_value + async_engine.dispose.assert_called_once() + + +async def test_lifespan_postgres_database_dispose_engine_on_failure( + is_pdb_enabled: bool, + app_environment: EnvVarsDict, + mock_create_async_engine_and_database_ready: MockType, + app_lifespan: LifespanManager, +): + expected_msg = "my_faulty_lifespan error" + + def raise_error(): + raise RuntimeError(expected_msg) + + @app_lifespan.add + async def my_faulty_lifespan(app: FastAPI, state: State) -> AsyncIterator[State]: + assert PostgresLifespanState.POSTGRES_ASYNC_ENGINE in state + raise_error() + yield {} + + app = FastAPI(lifespan=app_lifespan) + + with pytest.raises(RuntimeError, match=expected_msg): + async with ASGILifespanManager( + app, + startup_timeout=None if is_pdb_enabled else 10, + shutdown_timeout=None if is_pdb_enabled else 10, + ): + ... + + # Verify that the engine was disposed even if error happend + async_engine: Any = mock_create_async_engine_and_database_ready.return_value + async_engine.dispose.assert_called_once() + + +async def test_setup_postgres_database_with_empty_pg_settings( + is_pdb_enabled: bool, +): + async def my_app_settings(app: FastAPI) -> AsyncIterator[State]: + yield {PostgresLifespanState.POSTGRES_SETTINGS: None} + + app_lifespan = LifespanManager() + app_lifespan.add(my_app_settings) + app_lifespan.add(postgres_database_lifespan) + + app = FastAPI(lifespan=app_lifespan) + + with pytest.raises(PostgresConfigurationError, match="postgres cannot be disabled"): + async with ASGILifespanManager( + app, + startup_timeout=None if is_pdb_enabled else 10, + shutdown_timeout=None if is_pdb_enabled else 10, + ): + ... diff --git a/scripts/maintenance/requirements.txt b/scripts/maintenance/requirements.txt index 9b44c996137..4c521ce0a8c 100644 --- a/scripts/maintenance/requirements.txt +++ b/scripts/maintenance/requirements.txt @@ -11,4 +11,4 @@ pylint python-dateutil python-dotenv tenacity -typer[all] +typer diff --git a/services/catalog/src/simcore_service_catalog/api/rpc/routes.py b/services/catalog/src/simcore_service_catalog/api/rpc/routes.py index ce35e7867d1..3c2310c2680 100644 --- a/services/catalog/src/simcore_service_catalog/api/rpc/routes.py +++ b/services/catalog/src/simcore_service_catalog/api/rpc/routes.py @@ -1,6 +1,8 @@ import logging +from collections.abc import AsyncIterator from fastapi import FastAPI +from fastapi_lifespan_manager import State from models_library.api_schemas_catalog import CATALOG_RPC_NAMESPACE from ...services.rabbitmq import get_rabbitmq_rpc_server @@ -9,9 +11,11 @@ _logger = logging.getLogger(__name__) -def setup_rpc_api_routes(app: FastAPI) -> None: - async def _on_startup() -> None: - rpc_server = get_rabbitmq_rpc_server(app) - await rpc_server.register_router(_services.router, CATALOG_RPC_NAMESPACE, app) - - app.add_event_handler("startup", _on_startup) +async def rpc_api_lifespan(app: FastAPI) -> AsyncIterator[State]: + rpc_server = get_rabbitmq_rpc_server(app) + await rpc_server.register_router(_services.router, CATALOG_RPC_NAMESPACE, app) + try: + yield {} + finally: + # No specific cleanup required for now + pass diff --git a/services/catalog/src/simcore_service_catalog/core/application.py b/services/catalog/src/simcore_service_catalog/core/application.py index 7bedab76a31..15762c09226 100644 --- a/services/catalog/src/simcore_service_catalog/core/application.py +++ b/services/catalog/src/simcore_service_catalog/core/application.py @@ -5,20 +5,22 @@ from models_library.basic_types import BootModeEnum from servicelib.fastapi import timing_middleware from servicelib.fastapi.openapi import override_fastapi_openapi_method -from servicelib.fastapi.profiler import initialize_profiler from servicelib.fastapi.prometheus_instrumentation import ( - setup_prometheus_instrumentation, + initialize_prometheus_instrumentation, ) from servicelib.fastapi.tracing import initialize_tracing from starlette.middleware.base import BaseHTTPMiddleware -from .._meta import API_VERSION, API_VTAG, APP_NAME, PROJECT_NAME, SUMMARY +from .._meta import ( + API_VERSION, + API_VTAG, + APP_NAME, + PROJECT_NAME, + SUMMARY, +) from ..api.rest.routes import setup_rest_api_routes -from ..api.rpc.routes import setup_rpc_api_routes from ..exceptions.handlers import setup_exception_handlers -from ..services.function_services import setup_function_services -from ..services.rabbitmq import setup_rabbitmq -from .events import create_on_shutdown, create_on_startup +from . import events from .settings import ApplicationSettings _logger = logging.getLogger(__name__) @@ -34,7 +36,7 @@ ) -def create_app(settings: ApplicationSettings | None = None) -> FastAPI: +def create_app() -> FastAPI: # keep mostly quiet noisy loggers quiet_level: int = max( min(logging.root.level + _LOG_LEVEL_STEP, logging.CRITICAL), logging.WARNING @@ -42,10 +44,7 @@ def create_app(settings: ApplicationSettings | None = None) -> FastAPI: for name in _NOISY_LOGGERS: logging.getLogger(name).setLevel(quiet_level) - if settings is None: - settings = ApplicationSettings.create_from_envs() - - assert settings # nosec + settings = ApplicationSettings.create_from_envs() _logger.debug(settings.model_dump_json(indent=2)) app = FastAPI( @@ -57,28 +56,19 @@ def create_app(settings: ApplicationSettings | None = None) -> FastAPI: openapi_url=f"/api/{API_VTAG}/openapi.json", docs_url="/dev/doc", redoc_url=None, # default disabled + lifespan=events.create_app_lifespan(), ) override_fastapi_openapi_method(app) # STATE app.state.settings = settings + # MIDDLEWARES if settings.CATALOG_TRACING: initialize_tracing(app, settings.CATALOG_TRACING, APP_NAME) - # STARTUP-EVENT - app.add_event_handler("startup", create_on_startup(app)) - - # PLUGIN SETUP - setup_function_services(app) - setup_rabbitmq(app) - - if app.state.settings.CATALOG_PROMETHEUS_INSTRUMENTATION_ENABLED: - setup_prometheus_instrumentation(app) - - # MIDDLEWARES - if app.state.settings.CATALOG_PROFILING: - initialize_profiler(app) + if settings.CATALOG_PROMETHEUS_INSTRUMENTATION_ENABLED: + initialize_prometheus_instrumentation(app) if settings.SC_BOOT_MODE != BootModeEnum.PRODUCTION: # middleware to time requests (ONLY for development) @@ -90,10 +80,6 @@ def create_app(settings: ApplicationSettings | None = None) -> FastAPI: # ROUTES setup_rest_api_routes(app, vtag=API_VTAG) - setup_rpc_api_routes(app) - - # SHUTDOWN-EVENT - app.add_event_handler("shutdown", create_on_shutdown(app)) # EXCEPTIONS setup_exception_handlers(app) diff --git a/services/catalog/src/simcore_service_catalog/core/background_tasks.py b/services/catalog/src/simcore_service_catalog/core/background_tasks.py index cb269ee3919..dfd5365b562 100644 --- a/services/catalog/src/simcore_service_catalog/core/background_tasks.py +++ b/services/catalog/src/simcore_service_catalog/core/background_tasks.py @@ -11,11 +11,13 @@ import asyncio import logging +from collections.abc import AsyncIterator from contextlib import suppress from pprint import pformat from typing import Final from fastapi import FastAPI, HTTPException +from fastapi_lifespan_manager import State from models_library.services import ServiceMetaDataPublished from models_library.services_types import ServiceKey, ServiceVersion from packaging.version import Version @@ -115,9 +117,9 @@ async def _ensure_registry_and_database_are_synced(app: FastAPI) -> None: director_api = get_director_api(app) services_in_manifest_map = await manifest.get_services_map(director_api) - services_in_db: set[ - tuple[ServiceKey, ServiceVersion] - ] = await _list_services_in_database(app.state.engine) + services_in_db: set[tuple[ServiceKey, ServiceVersion]] = ( + await _list_services_in_database(app.state.engine) + ) # check that the db has all the services at least once missing_services_in_db = set(services_in_manifest_map.keys()) - services_in_db @@ -232,3 +234,11 @@ async def stop_registry_sync_task(app: FastAPI) -> None: await task app.state.registry_sync_task = None _logger.info("registry syncing task stopped") + + +async def background_task_lifespan(app: FastAPI) -> AsyncIterator[State]: + await start_registry_sync_task(app) + try: + yield {} + finally: + await stop_registry_sync_task(app) diff --git a/services/catalog/src/simcore_service_catalog/core/events.py b/services/catalog/src/simcore_service_catalog/core/events.py index f22adbba4ec..097a4ab4113 100644 --- a/services/catalog/src/simcore_service_catalog/core/events.py +++ b/services/catalog/src/simcore_service_catalog/core/events.py @@ -1,65 +1,88 @@ import logging -from collections.abc import Awaitable, Callable -from typing import TypeAlias +from collections.abc import AsyncIterator from fastapi import FastAPI -from servicelib.fastapi.db_asyncpg_engine import close_db_connection, connect_to_db -from servicelib.logging_utils import log_context +from fastapi_lifespan_manager import LifespanManager, State +from servicelib.fastapi.postgres_lifespan import ( + PostgresLifespanState, + postgres_database_lifespan, +) +from servicelib.fastapi.prometheus_instrumentation import ( + lifespan_prometheus_instrumentation, +) from .._meta import APP_FINISHED_BANNER_MSG, APP_STARTED_BANNER_MSG -from ..db.events import setup_default_product -from ..services.director import close_director, setup_director -from .background_tasks import start_registry_sync_task, stop_registry_sync_task +from ..api.rpc.routes import rpc_api_lifespan +from ..db.events import database_lifespan +from ..services.director import director_lifespan +from ..services.function_services import function_services_lifespan +from ..services.rabbitmq import rabbitmq_lifespan +from .background_tasks import background_task_lifespan +from .settings import ApplicationSettings _logger = logging.getLogger(__name__) -EventCallable: TypeAlias = Callable[[], Awaitable[None]] - - def _flush_started_banner() -> None: # WARNING: this function is spied in the tests print(APP_STARTED_BANNER_MSG, flush=True) # noqa: T201 def _flush_finished_banner() -> None: + # WARNING: this function is spied in the tests print(APP_FINISHED_BANNER_MSG, flush=True) # noqa: T201 -def create_on_startup(app: FastAPI) -> EventCallable: - async def _() -> None: - _flush_started_banner() +async def _banners_lifespan(_) -> AsyncIterator[State]: + _flush_started_banner() + yield {} + _flush_finished_banner() + + +async def _main_lifespan(app: FastAPI) -> AsyncIterator[State]: + settings: ApplicationSettings = app.state.settings + + yield { + PostgresLifespanState.POSTGRES_SETTINGS: settings.CATALOG_POSTGRES, + "prometheus_instrumentation_enabled": settings.CATALOG_PROMETHEUS_INSTRUMENTATION_ENABLED, + } + + +async def _prometheus_instrumentation_lifespan( + app: FastAPI, state: State +) -> AsyncIterator[State]: + if state.get("prometheus_instrumentation_enabled", False): + async for prometheus_state in lifespan_prometheus_instrumentation(app): + yield prometheus_state + - # setup connection to pg db - if app.state.settings.CATALOG_POSTGRES: - await connect_to_db(app, app.state.settings.CATALOG_POSTGRES) - await setup_default_product(app) +def create_app_lifespan(): + # WARNING: order matters + app_lifespan = LifespanManager() + app_lifespan.add(_main_lifespan) - if app.state.settings.CATALOG_DIRECTOR: - # setup connection to director - await setup_director(app) + # - postgres + app_lifespan.add(postgres_database_lifespan) + app_lifespan.add(database_lifespan) - # FIXME: check director service is in place and ready. Hand-shake?? - # SEE https://github.com/ITISFoundation/osparc-simcore/issues/1728 - await start_registry_sync_task(app) + # - rabbitmq + app_lifespan.add(rabbitmq_lifespan) - _logger.info("Application started") + # - rpc api routes + app_lifespan.add(rpc_api_lifespan) - return _ + # - director + app_lifespan.add(director_lifespan) + # - function services + app_lifespan.add(function_services_lifespan) -def create_on_shutdown(app: FastAPI) -> EventCallable: - async def _() -> None: + # - background task + app_lifespan.add(background_task_lifespan) - with log_context(_logger, logging.INFO, "Application shutdown"): - if app.state.settings.CATALOG_DIRECTOR: - try: - await stop_registry_sync_task(app) - await close_director(app) - await close_db_connection(app) - except Exception: # pylint: disable=broad-except - _logger.exception("Unexpected error while closing application") + # - prometheus instrumentation + app_lifespan.add(_prometheus_instrumentation_lifespan) - _flush_finished_banner() + app_lifespan.add(_banners_lifespan) - return _ + return app_lifespan diff --git a/services/catalog/src/simcore_service_catalog/core/settings.py b/services/catalog/src/simcore_service_catalog/core/settings.py index eba2176bc81..5581bf4ba99 100644 --- a/services/catalog/src/simcore_service_catalog/core/settings.py +++ b/services/catalog/src/simcore_service_catalog/core/settings.py @@ -46,9 +46,9 @@ def base_url(self) -> str: ) -_DEFAULT_SERVICE_SPECIFICATIONS: Final[ - ServiceSpecifications -] = ServiceSpecifications.model_validate({}) +_DEFAULT_SERVICE_SPECIFICATIONS: Final[ServiceSpecifications] = ( + ServiceSpecifications.model_validate({}) +) class ApplicationSettings(BaseApplicationSettings, MixinLoggingSettings): @@ -88,7 +88,7 @@ class ApplicationSettings(BaseApplicationSettings, MixinLoggingSettings): ] = False CATALOG_POSTGRES: Annotated[ - PostgresSettings | None, + PostgresSettings, Field(json_schema_extra={"auto_default_from_env": True}), ] @@ -102,7 +102,7 @@ class ApplicationSettings(BaseApplicationSettings, MixinLoggingSettings): ] CATALOG_DIRECTOR: Annotated[ - DirectorSettings | None, + DirectorSettings, Field(json_schema_extra={"auto_default_from_env": True}), ] diff --git a/services/catalog/src/simcore_service_catalog/db/events.py b/services/catalog/src/simcore_service_catalog/db/events.py index 42de4c38620..950f67e373f 100644 --- a/services/catalog/src/simcore_service_catalog/db/events.py +++ b/services/catalog/src/simcore_service_catalog/db/events.py @@ -1,12 +1,20 @@ import logging +from collections.abc import AsyncIterator from fastapi import FastAPI +from fastapi_lifespan_manager import State +from servicelib.fastapi.postgres_lifespan import PostgresLifespanState from .repositories.products import ProductsRepository _logger = logging.getLogger(__name__) -async def setup_default_product(app: FastAPI): +async def database_lifespan(app: FastAPI, state: State) -> AsyncIterator[State]: + app.state.engine = state[PostgresLifespanState.POSTGRES_ASYNC_ENGINE] + repo = ProductsRepository(db_engine=app.state.engine) + app.state.default_product_name = await repo.get_default_product_name() + + yield {} diff --git a/services/catalog/src/simcore_service_catalog/services/director.py b/services/catalog/src/simcore_service_catalog/services/director.py index 52584062184..7560c622317 100644 --- a/services/catalog/src/simcore_service_catalog/services/director.py +++ b/services/catalog/src/simcore_service_catalog/services/director.py @@ -3,7 +3,7 @@ import json import logging import urllib.parse -from collections.abc import Awaitable, Callable +from collections.abc import AsyncIterator, Awaitable, Callable from contextlib import suppress from pprint import pformat from typing import Any, Final @@ -11,6 +11,7 @@ import httpx from common_library.json_serialization import json_dumps from fastapi import FastAPI, HTTPException +from fastapi_lifespan_manager import State from models_library.api_schemas_directorv2.services import ServiceExtras from models_library.services_metadata_published import ServiceMetaDataPublished from models_library.services_types import ServiceKey, ServiceVersion @@ -65,7 +66,7 @@ def _validate_kind(entry_to_validate: dict[str, Any], kind_name: str): def _return_data_or_raise_error( - request_func: Callable[..., Awaitable[httpx.Response]] + request_func: Callable[..., Awaitable[httpx.Response]], ) -> Callable[..., Awaitable[list[Any] | dict[str, Any]]]: """ Creates a context for safe inter-process communication (IPC) @@ -288,13 +289,14 @@ async def get_service_extras( return TypeAdapter(ServiceExtras).validate_python(result) -async def setup_director(app: FastAPI) -> None: +async def director_lifespan(app: FastAPI) -> AsyncIterator[State]: + client: DirectorApi | None = None + if settings := app.state.settings.CATALOG_DIRECTOR: with log_context( _logger, logging.DEBUG, "Setup director at %s", f"{settings.base_url=}" ): async for attempt in AsyncRetrying(**_director_startup_retry_policy): - client = DirectorApi(base_url=settings.base_url, app=app) with attempt: client = DirectorApi(base_url=settings.base_url, app=app) if not await client.is_responsive(): @@ -303,17 +305,16 @@ async def setup_director(app: FastAPI) -> None: raise DirectorUnresponsiveError _logger.info( - "Connection to director-v0 succeded [%s]", + "Connection to director-v0 succeeded [%s]", json_dumps(attempt.retry_state.retry_object.statistics), ) # set when connected app.state.director_api = client - -async def close_director(app: FastAPI) -> None: - client: DirectorApi | None - if client := app.state.director_api: - await client.close() - - _logger.debug("Director client closed successfully") + try: + yield {} + finally: + if client: + await client.close() + _logger.debug("Director client closed successfully") diff --git a/services/catalog/src/simcore_service_catalog/services/function_services.py b/services/catalog/src/simcore_service_catalog/services/function_services.py index 7ed546f251b..c5f326fec4b 100644 --- a/services/catalog/src/simcore_service_catalog/services/function_services.py +++ b/services/catalog/src/simcore_service_catalog/services/function_services.py @@ -1,9 +1,12 @@ +from collections.abc import AsyncIterator + # mypy: disable-error-code=truthy-function from typing import Any from fastapi import status from fastapi.applications import FastAPI from fastapi.exceptions import HTTPException +from fastapi_lifespan_manager import State from models_library.function_services_catalog import ( is_function_service, iter_service_docker_data, @@ -31,16 +34,19 @@ def get_function_service(key, version) -> ServiceMetaDataPublished: ) from err -def setup_function_services(app: FastAPI): - def _on_startup() -> None: - catalog = [_as_dict(metadata) for metadata in iter_service_docker_data()] - app.state.frontend_services_catalog = catalog +async def function_services_lifespan(app: FastAPI) -> AsyncIterator[State]: + app.state.frontend_services_catalog = [ + _as_dict(metadata) for metadata in iter_service_docker_data() + ] - app.add_event_handler("startup", _on_startup) + try: + yield {} + finally: + app.state.frontend_services_catalog = None __all__: tuple[str, ...] = ( "get_function_service", "is_function_service", - "setup_function_services", + "function_services_lifespan", ) diff --git a/services/catalog/src/simcore_service_catalog/services/rabbitmq.py b/services/catalog/src/simcore_service_catalog/services/rabbitmq.py index 8400885efa0..ba46173b647 100644 --- a/services/catalog/src/simcore_service_catalog/services/rabbitmq.py +++ b/services/catalog/src/simcore_service_catalog/services/rabbitmq.py @@ -1,7 +1,9 @@ import logging +from collections.abc import AsyncIterator from typing import cast from fastapi import FastAPI +from fastapi_lifespan_manager import State from servicelib.rabbitmq import RabbitMQRPCClient, wait_till_rabbitmq_responsive from settings_library.rabbit import RabbitSettings @@ -15,25 +17,21 @@ def get_rabbitmq_settings(app: FastAPI) -> RabbitSettings: return settings -def setup_rabbitmq(app: FastAPI) -> None: +async def rabbitmq_lifespan(app: FastAPI) -> AsyncIterator[State]: settings: RabbitSettings = get_rabbitmq_settings(app) - app.state.rabbitmq_rpc_server = None + await wait_till_rabbitmq_responsive(settings.dsn) - async def _on_startup() -> None: - await wait_till_rabbitmq_responsive(settings.dsn) + app.state.rabbitmq_rpc_server = await RabbitMQRPCClient.create( + client_name=f"{PROJECT_NAME}_rpc_server", settings=settings + ) - app.state.rabbitmq_rpc_server = await RabbitMQRPCClient.create( - client_name=f"{PROJECT_NAME}_rpc_server", settings=settings - ) - - async def _on_shutdown() -> None: + try: + yield {} + finally: if app.state.rabbitmq_rpc_server: await app.state.rabbitmq_rpc_server.close() app.state.rabbitmq_rpc_server = None - app.add_event_handler("startup", _on_startup) - app.add_event_handler("shutdown", _on_shutdown) - def get_rabbitmq_rpc_server(app: FastAPI) -> RabbitMQRPCClient: assert app.state.rabbitmq_rpc_server # nosec diff --git a/services/catalog/tests/unit/conftest.py b/services/catalog/tests/unit/conftest.py index 4b0000cbac4..02a027f63f5 100644 --- a/services/catalog/tests/unit/conftest.py +++ b/services/catalog/tests/unit/conftest.py @@ -15,6 +15,7 @@ import pytest import respx import simcore_service_catalog +import simcore_service_catalog.core.application import simcore_service_catalog.core.events import yaml from asgi_lifespan import LifespanManager @@ -139,7 +140,7 @@ async def app( # create instance assert app_environment - app_under_test = create_app(settings=app_settings) + app_under_test = create_app() assert spy_app.on_startup.call_count == 0 assert spy_app.on_shutdown.call_count == 0 @@ -166,7 +167,7 @@ def client( # create instance assert app_environment - app_under_test = create_app(settings=app_settings) + app_under_test = create_app() assert ( spy_app.on_startup.call_count == 0 @@ -199,7 +200,7 @@ async def aclient( headers={"Content-Type": "application/json"}, transport=httpx.ASGITransport(app=app), ) as acli: - assert isinstance(acli._transport, httpx.ASGITransport) + assert isinstance(acli._transport, httpx.ASGITransport) # noqa: SLF001 assert spy_app.on_startup.call_count == 1 assert spy_app.on_shutdown.call_count == 0 @@ -215,32 +216,37 @@ def service_caching_disabled(monkeypatch: pytest.MonkeyPatch) -> None: @pytest.fixture -def postgres_setup_disabled(monkeypatch: pytest.MonkeyPatch) -> None: - monkeypatch.setenv("CATALOG_POSTGRES", "null") +def postgres_setup_disabled(mocker: MockerFixture): + mocker.patch.object( + simcore_service_catalog.core.events, "postgres_database_lifespan" + ) + mocker.patch.object(simcore_service_catalog.core.events, "database_lifespan") @pytest.fixture def background_tasks_setup_disabled(mocker: MockerFixture) -> None: """patch the setup of the background task so we can call it manually""" - def _factory(name): - async def _side_effect(app: FastAPI): - assert app + class MockedBackgroundTaskContextManager: + async def __aenter__(self): print( "TEST", background_tasks_setup_disabled.__name__, - "Disabled background tasks. Skipping execution of", - name, + "Disabled background tasks. Skipping execution of __aenter__", ) - return _side_effect + async def __aexit__(self, exc_type, exc_value, traceback): + print( + "TEST", + background_tasks_setup_disabled.__name__, + "Disabled background tasks. Skipping execution of __aexit__", + ) - for name in ("start_registry_sync_task", "stop_registry_sync_task"): - mocker.patch( - f"simcore_service_catalog.core.events.{name}", - side_effect=_factory(name), - autospec=True, - ) + mocker.patch.object( + simcore_service_catalog.core.events, + "background_task_lifespan", + return_value=MockedBackgroundTaskContextManager(), + ) # @@ -251,8 +257,8 @@ async def _side_effect(app: FastAPI): @pytest.fixture def rabbitmq_and_rpc_setup_disabled(mocker: MockerFixture): # The following services are affected if rabbitmq is not in place - mocker.patch("simcore_service_catalog.core.application.setup_rabbitmq") - mocker.patch("simcore_service_catalog.core.application.setup_rpc_api_routes") + mocker.patch.object(simcore_service_catalog.core.events, "rabbitmq_lifespan") + mocker.patch.object(simcore_service_catalog.core.events, "rpc_api_lifespan") @pytest.fixture @@ -268,8 +274,8 @@ async def rpc_client( @pytest.fixture -def director_setup_disabled(monkeypatch: pytest.MonkeyPatch) -> None: - monkeypatch.setenv("CATALOG_DIRECTOR", "null") +def director_setup_disabled(mocker: MockerFixture) -> None: + mocker.patch.object(simcore_service_catalog.core.events, "director_lifespan") @pytest.fixture diff --git a/services/catalog/tests/unit/test_services_director.py b/services/catalog/tests/unit/test_services_director.py index eb36988b519..94998f74749 100644 --- a/services/catalog/tests/unit/test_services_director.py +++ b/services/catalog/tests/unit/test_services_director.py @@ -27,13 +27,13 @@ def app_environment( monkeypatch, { **app_environment, - "CATALOG_POSTGRES": "null", # disable postgres "SC_BOOT_MODE": "local-development", }, ) async def test_director_client_high_level_api( + postgres_setup_disabled: None, background_tasks_setup_disabled: None, rabbitmq_and_rpc_setup_disabled: None, expected_director_list_services: list[dict[str, Any]], @@ -59,6 +59,7 @@ async def test_director_client_high_level_api( async def test_director_client_low_level_api( + postgres_setup_disabled: None, background_tasks_setup_disabled: None, rabbitmq_and_rpc_setup_disabled: None, mocked_director_service_api: MockRouter, diff --git a/services/catalog/tests/unit/test_services_manifest.py b/services/catalog/tests/unit/test_services_manifest.py index a43d82d5220..67b9b03e2e3 100644 --- a/services/catalog/tests/unit/test_services_manifest.py +++ b/services/catalog/tests/unit/test_services_manifest.py @@ -26,13 +26,13 @@ def app_environment( monkeypatch, { **app_environment, - "CATALOG_POSTGRES": "null", # disable postgres "SC_BOOT_MODE": "local-development", }, ) async def test_services_manifest_api( + postgres_setup_disabled: None, rabbitmq_and_rpc_setup_disabled: None, mocked_director_service_api: MockRouter, app: FastAPI, diff --git a/services/catalog/tests/unit/with_dbs/test_db_repositories.py b/services/catalog/tests/unit/with_dbs/test_db_repositories.py index 8618a67c25e..40a4f081764 100644 --- a/services/catalog/tests/unit/with_dbs/test_db_repositories.py +++ b/services/catalog/tests/unit/with_dbs/test_db_repositories.py @@ -487,7 +487,7 @@ async def test_get_service_history_page( user_id: UserID, ): # inject services with multiple versions - service_key = "simcore/services/dynamic/test-service" + service_key = "simcore/services/dynamic/test-some-service" num_versions = 10 release_versions = [ diff --git a/services/director-v2/src/simcore_service_director_v2/modules/db/_asyncpg.py b/services/director-v2/src/simcore_service_director_v2/modules/db/_asyncpg.py index 188117d9c93..83fe1cf9204 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/db/_asyncpg.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/db/_asyncpg.py @@ -1,7 +1,9 @@ import logging from fastapi import FastAPI -from servicelib.db_asyncpg_utils import create_async_engine_and_pg_database_ready +from servicelib.db_asyncpg_utils import ( + create_async_engine_and_database_ready, +) from servicelib.logging_utils import log_context from settings_library.postgres import PostgresSettings from simcore_postgres_database.utils_aiosqlalchemy import get_pg_engine_stateinfo @@ -15,7 +17,7 @@ async def asyncpg_connect_to_db(app: FastAPI, settings: PostgresSettings) -> Non logging.DEBUG, f"Connecting and migraging {settings.dsn_with_async_sqlalchemy}", ): - engine = await create_async_engine_and_pg_database_ready(settings) + engine = await create_async_engine_and_database_ready(settings) app.state.asyncpg_engine = engine _logger.debug(