Skip to content

🎨 catalog: lifespan managers for fastapi apps #7483

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 29 commits into from
Apr 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
2a76de5
tests concept further
pcrespov Apr 7, 2025
7e287e2
🐛 Fix: Adjust sign handling in timedelta formatting and improve Dynam…
pcrespov Apr 7, 2025
3ce6b36
helpers
pcrespov Apr 7, 2025
8d4b3bf
✨ Refactor: Rename async engine creation function and update references
pcrespov Apr 7, 2025
c491f77
✨ Refactor: Introduce PostgresLifespanStateKeys enum for improved sta…
pcrespov Apr 7, 2025
7532170
✨ Refactor: Rename async engine creation function for clarity and dep…
pcrespov Apr 7, 2025
23af712
✨ Refactor: Rename async engine creation function for consistency and…
pcrespov Apr 7, 2025
8305b9b
✨ Refactor: Deprecate connect_to_db function and recommend postgres_l…
pcrespov Apr 7, 2025
c4fe816
✨ Refactor catalog: Implement application lifespan management with Po…
pcrespov Apr 7, 2025
5c0aa25
✨ Refactor: Ensure async engine disposal in setup_postgres_database w…
pcrespov Apr 7, 2025
4dc9f46
✨ Refactor: Clean up imports in postgres_lifespan.py for consistency
pcrespov Apr 7, 2025
f26338a
✨ Refactor: Enhance Postgres settings validation and update test fixt…
pcrespov Apr 7, 2025
119540c
cleanup
pcrespov Apr 7, 2025
79887d2
✨ Refactor: Simplify application lifespan management by moving logic …
pcrespov Apr 7, 2025
47979b4
✨ Refactor: Update service setup functions to use async iterators for…
pcrespov Apr 7, 2025
05c1a90
✨ Refactor: Integrate Prometheus instrumentation and tracing into app…
pcrespov Apr 7, 2025
4c26ada
✨ Refactor: Update RPC API route setup to use async iterator for impr…
pcrespov Apr 7, 2025
da317b7
✨ Refactor: Update background tasks setup to use context manager for …
pcrespov Apr 7, 2025
adcb226
✨ Refactor: Enhance Prometheus instrumentation integration and update…
pcrespov Apr 7, 2025
2bad4cd
✨ Refactor: Rename postgres lifespan manager for consistency across t…
pcrespov Apr 7, 2025
f8bd194
minor
pcrespov Apr 8, 2025
85703c0
@sanderegg review: config error
pcrespov Apr 8, 2025
2a33aa9
@GitHK review: rename
pcrespov Apr 8, 2025
f7c7f5d
@sanderegg review: renames
pcrespov Apr 8, 2025
b088016
✨ Refactor: Rename lifespan mocks for clarity and consistency in tests
pcrespov Apr 8, 2025
530f4df
minor
pcrespov Apr 8, 2025
1fd7453
fixes fixture
pcrespov Apr 8, 2025
2145c09
✨ Refactor: Update service specifications and remove redundant null s…
pcrespov Apr 8, 2025
0ccb681
fixes test
pcrespov Apr 8, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def _timedelta_as_minute_second_ms(delta: datetime.timedelta) -> str:
if int(milliseconds * 1000) != 0:
result += f"{int(milliseconds*1000)}ms"

sign = "-" if total_seconds < 0 else ""
sign = "-" if total_seconds < 0 else "<1ms"

return f"{sign}{result.strip()}"

Expand All @@ -32,10 +32,11 @@ class DynamicIndentFormatter(logging.Formatter):
_cls_indent_level: int = 0
_instance_indent_level: int = 0

def __init__(self, fmt=None, datefmt=None, style="%"):
def __init__(self, *args, **kwargs):
fmt = args[0] if args else None
dynamic_fmt = fmt or "%(asctime)s %(levelname)s %(message)s"
assert "message" in dynamic_fmt
super().__init__(dynamic_fmt, datefmt, style)
super().__init__(dynamic_fmt, *args, **kwargs)

def format(self, record) -> str:
original_message = record.msg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
SEE migration aiopg->asyncpg https://github.com/ITISFoundation/osparc-simcore/issues/4529
"""


import logging
from typing import Final

Expand All @@ -16,7 +15,7 @@
)
from sqlalchemy.ext.asyncio import AsyncEngine

from ..db_asyncpg_utils import create_async_engine_and_pg_database_ready
from ..db_asyncpg_utils import create_async_engine_and_database_ready
from ..logging_utils import log_context

APP_DB_ASYNC_ENGINE_KEY: Final[str] = f"{__name__ }.AsyncEngine"
Expand Down Expand Up @@ -56,7 +55,7 @@ async def connect_to_db(app: web.Application, settings: PostgresSettings) -> Non
"Connecting app[APP_DB_ASYNC_ENGINE_KEY] to postgres with %s",
f"{settings=}",
):
engine = await create_async_engine_and_pg_database_ready(settings)
engine = await create_async_engine_and_database_ready(settings)
_set_async_engine_to_app_state(app, engine)

_logger.info(
Expand Down
7 changes: 7 additions & 0 deletions packages/service-library/src/servicelib/db_async_engine.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import warnings

from fastapi import FastAPI
from settings_library.postgres import PostgresSettings
Expand All @@ -17,6 +18,12 @@

@retry(**PostgresRetryPolicyUponInitialization(_logger).kwargs)
async def connect_to_db(app: FastAPI, settings: PostgresSettings) -> None:
warnings.warn(
"The 'connect_to_db' function is deprecated and will be removed in a future release. "
"Please use 'postgres_lifespan' instead for managing the database connection lifecycle.",
DeprecationWarning,
stacklevel=2,
)
with log_context(
_logger, logging.DEBUG, f"connection to db {settings.dsn_with_async_sqlalchemy}"
):
Expand Down
14 changes: 8 additions & 6 deletions packages/service-library/src/servicelib/db_asyncpg_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@

from models_library.healthchecks import IsNonResponsive, IsResponsive, LivenessResult
from settings_library.postgres import PostgresSettings
from simcore_postgres_database.utils_aiosqlalchemy import ( # type: ignore[import-not-found] # this on is unclear
raise_if_migration_not_ready,
)
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
from tenacity import retry
Expand All @@ -17,7 +14,7 @@


@retry(**PostgresRetryPolicyUponInitialization(_logger).kwargs)
async def create_async_engine_and_pg_database_ready(
async def create_async_engine_and_database_ready(
settings: PostgresSettings,
) -> AsyncEngine:
"""
Expand All @@ -26,13 +23,17 @@ async def create_async_engine_and_pg_database_ready(
- waits until db data is migrated (i.e. ready to use)
- returns engine
"""
from simcore_postgres_database.utils_aiosqlalchemy import ( # type: ignore[import-not-found] # this on is unclear
raise_if_migration_not_ready,
)

server_settings = None
if settings.POSTGRES_CLIENT_NAME:
server_settings = {
"application_name": settings.POSTGRES_CLIENT_NAME,
}

engine: AsyncEngine = create_async_engine(
engine = create_async_engine(
settings.dsn_with_async_sqlalchemy,
pool_size=settings.POSTGRES_MINSIZE,
max_overflow=settings.POSTGRES_MAXSIZE - settings.POSTGRES_MINSIZE,
Expand All @@ -43,9 +44,10 @@ async def create_async_engine_and_pg_database_ready(

try:
await raise_if_migration_not_ready(engine)
except Exception:
except Exception as exc:
# NOTE: engine must be closed because retry will create a new engine
await engine.dispose()
exc.add_note("Failed during migration check. Created engine disposed.")
raise

return engine
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import warnings

from fastapi import FastAPI
from settings_library.postgres import PostgresSettings
Expand All @@ -7,19 +8,26 @@
)
from sqlalchemy.ext.asyncio import AsyncEngine

from ..db_asyncpg_utils import create_async_engine_and_pg_database_ready
from ..db_asyncpg_utils import create_async_engine_and_database_ready
from ..logging_utils import log_context

_logger = logging.getLogger(__name__)


async def connect_to_db(app: FastAPI, settings: PostgresSettings) -> None:
warnings.warn(
"The 'connect_to_db' function is deprecated and will be removed in a future release. "
"Please use 'postgres_lifespan' instead for managing the database connection lifecycle.",
DeprecationWarning,
stacklevel=2,
)

with log_context(
_logger,
logging.DEBUG,
f"Connecting and migraging {settings.dsn_with_async_sqlalchemy}",
):
engine = await create_async_engine_and_pg_database_ready(settings)
engine = await create_async_engine_and_database_ready(settings)

app.state.engine = engine
_logger.debug(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,24 @@
from collections.abc import AsyncIterator
from typing import Protocol

from common_library.errors_classes import OsparcErrorMixin
from fastapi import FastAPI
from fastapi_lifespan_manager import LifespanManager, State


class LifespanError(OsparcErrorMixin, RuntimeError): ...


class LifespanOnStartupError(LifespanError):
msg_template = "Failed during startup of {module}"


class LifespanOnShutdownError(LifespanError):
msg_template = "Failed during shutdown of {module}"


class LifespanGenerator(Protocol):
def __call__(self, app: FastAPI) -> AsyncIterator["State"]:
...
def __call__(self, app: FastAPI) -> AsyncIterator["State"]: ...


def combine_lifespans(*generators: LifespanGenerator) -> LifespanManager:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import asyncio
import logging
from collections.abc import AsyncIterator
from enum import Enum

from fastapi_lifespan_manager import State
from servicelib.logging_utils import log_catch, log_context
from settings_library.postgres import PostgresSettings
from sqlalchemy.ext.asyncio import AsyncEngine

from ..db_asyncpg_utils import create_async_engine_and_database_ready
from .lifespan_utils import LifespanOnStartupError

_logger = logging.getLogger(__name__)


class PostgresLifespanState(str, Enum):
POSTGRES_SETTINGS = "postgres_settings"
POSTGRES_ASYNC_ENGINE = "postgres.async_engine"


class PostgresConfigurationError(LifespanOnStartupError):
msg_template = "Invalid postgres settings [={settings}] on startup. Note that postgres cannot be disabled using settings"


def create_input_state(settings: PostgresSettings) -> State:
return {PostgresLifespanState.POSTGRES_SETTINGS: settings}


async def postgres_database_lifespan(_, state: State) -> AsyncIterator[State]:

with log_context(_logger, logging.INFO, f"{__name__}"):

settings = state[PostgresLifespanState.POSTGRES_SETTINGS]

if settings is None or not isinstance(settings, PostgresSettings):
raise PostgresConfigurationError(settings=settings)

assert isinstance(settings, PostgresSettings) # nosec

# connect to database
async_engine: AsyncEngine = await create_async_engine_and_database_ready(
settings
)

try:

yield {
PostgresLifespanState.POSTGRES_ASYNC_ENGINE: async_engine,
}

finally:
with log_catch(_logger, reraise=False):
await asyncio.wait_for(async_engine.dispose(), timeout=10)
Loading
Loading