Skip to content

Commit bb5300c

Browse files
committed
Merge branch 'main' into reformat-show-api-request
2 parents 50b7628 + 6043281 commit bb5300c

File tree

2 files changed

+77
-14
lines changed

2 files changed

+77
-14
lines changed

cads_processing_api_service/clients.py

Lines changed: 53 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,9 @@ def post_process_execution(
202202
job_kwargs = adaptors.make_system_job_kwargs(
203203
resource, execution_content, adaptor.resources
204204
)
205-
compute_sessionmaker = db_utils.get_compute_sessionmaker()
205+
compute_sessionmaker = db_utils.get_compute_sessionmaker(
206+
mode=db_utils.ConnectionMode.write
207+
)
206208
with compute_sessionmaker() as compute_session:
207209
job = cads_broker.database.create_request(
208210
session=compute_session,
@@ -284,7 +286,9 @@ def get_jobs(
284286
statement, self.job_table, back, sort_key, sort_dir
285287
)
286288
statement = utils.apply_limit(statement, limit)
287-
compute_sessionmaker = db_utils.get_compute_sessionmaker()
289+
compute_sessionmaker = db_utils.get_compute_sessionmaker(
290+
mode=db_utils.ConnectionMode.read
291+
)
288292
catalogue_sessionmaker = db_utils.get_catalogue_sessionmaker()
289293
with compute_sessionmaker() as compute_session:
290294
job_entries = compute_session.scalars(statement).all()
@@ -351,9 +355,22 @@ def get_job(
351355
"""
352356
user_uid = auth.authenticate_user(auth_header, portal_header)
353357
portals = [p.strip() for p in portal_header.split(",")]
354-
compute_sessionmaker = db_utils.get_compute_sessionmaker()
355-
with compute_sessionmaker() as compute_session:
356-
job = utils.get_job_from_broker_db(job_id=job_id, session=compute_session)
358+
try:
359+
compute_sessionmaker = db_utils.get_compute_sessionmaker(
360+
mode=db_utils.ConnectionMode.read
361+
)
362+
with compute_sessionmaker() as compute_session:
363+
job = utils.get_job_from_broker_db(
364+
job_id=job_id, session=compute_session
365+
)
366+
except ogc_api_processes_fastapi.exceptions.NoSuchJob:
367+
compute_sessionmaker = db_utils.get_compute_sessionmaker(
368+
mode=db_utils.ConnectionMode.write
369+
)
370+
with compute_sessionmaker() as compute_session:
371+
job = utils.get_job_from_broker_db(
372+
job_id=job_id, session=compute_session
373+
)
357374
if job["portal"] not in portals:
358375
raise ogc_api_processes_fastapi.exceptions.NoSuchJob()
359376
auth.verify_permission(user_uid, job)
@@ -407,12 +424,34 @@ def get_job_results(
407424
structlog.contextvars.bind_contextvars(job_id=job_id)
408425
user_uid = auth.authenticate_user(auth_header, portal_header)
409426
structlog.contextvars.bind_contextvars(user_id=user_uid)
410-
compute_sessionmaker = db_utils.get_compute_sessionmaker()
411-
with compute_sessionmaker() as compute_session:
412-
job = utils.get_job_from_broker_db(job_id=job_id, session=compute_session)
413-
auth.verify_permission(user_uid, job)
414-
results = utils.get_results_from_broker_db(job=job, session=compute_session)
415-
handle_download_metrics(job, results)
427+
try:
428+
compute_sessionmaker = db_utils.get_compute_sessionmaker(
429+
mode=db_utils.ConnectionMode.read
430+
)
431+
with compute_sessionmaker() as compute_session:
432+
job = utils.get_job_from_broker_db(
433+
job_id=job_id, session=compute_session
434+
)
435+
auth.verify_permission(user_uid, job)
436+
results = utils.get_results_from_broker_db(
437+
job=job, session=compute_session
438+
)
439+
except (
440+
ogc_api_processes_fastapi.exceptions.NoSuchJob,
441+
ogc_api_processes_fastapi.exceptions.ResultsNotReady,
442+
):
443+
compute_sessionmaker = db_utils.get_compute_sessionmaker(
444+
mode=db_utils.ConnectionMode.write
445+
)
446+
with compute_sessionmaker() as compute_session:
447+
job = utils.get_job_from_broker_db(
448+
job_id=job_id, session=compute_session
449+
)
450+
auth.verify_permission(user_uid, job)
451+
results = utils.get_results_from_broker_db(
452+
job=job, session=compute_session
453+
)
454+
handle_download_metrics(job, results)
416455
return results
417456

418457
def delete_job(
@@ -441,7 +480,9 @@ def delete_job(
441480
structlog.contextvars.bind_contextvars(job_id=job_id)
442481
user_uid = auth.authenticate_user(auth_header, portal_header)
443482
structlog.contextvars.bind_contextvars(user_id=user_uid)
444-
compute_sessionmaker = db_utils.get_compute_sessionmaker()
483+
compute_sessionmaker = db_utils.get_compute_sessionmaker(
484+
mode=db_utils.ConnectionMode.write
485+
)
445486
with compute_sessionmaker() as compute_session:
446487
job = utils.get_job_from_broker_db(job_id=job_id, session=compute_session)
447488
auth.verify_permission(user_uid, job)

cads_processing_api_service/db_utils.py

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
# See the License for the specific language governing permissions and
1515
# limitations under the License
1616

17+
import enum
1718
import functools
1819

1920
import cads_broker.config
@@ -22,18 +23,39 @@
2223
import sqlalchemy.orm
2324

2425

26+
class ConnectionMode(str, enum.Enum):
27+
"""Database connection mode."""
28+
29+
read = "read"
30+
write = "write"
31+
32+
2533
@functools.lru_cache()
26-
def get_compute_sessionmaker() -> sqlalchemy.orm.sessionmaker[sqlalchemy.orm.Session]:
34+
def get_compute_sessionmaker(
35+
mode: ConnectionMode = ConnectionMode.write,
36+
) -> sqlalchemy.orm.sessionmaker[sqlalchemy.orm.Session]:
2737
"""Get an sqlalchemy.orm.sessionmaker object bound to the Broker database.
2838
39+
Parameters
40+
----------
41+
mode: ConnectionMode
42+
Connection mode to the database. If ConnectionMode.read, the sessionmaker
43+
will open a connection to a read-only hostname.
44+
2945
Returns
3046
-------
3147
sqlalchemy.orm.sessionmaker
3248
sqlalchemy.orm.sessionmaker object bound to the Broker database.
3349
"""
3450
broker_settings = cads_broker.config.ensure_settings()
51+
if mode == ConnectionMode.write:
52+
connection_string = broker_settings.connection_string
53+
elif mode == ConnectionMode.read:
54+
connection_string = broker_settings.connection_string_read
55+
else:
56+
raise ValueError(f"Invalid connection mode: {str(mode)}")
3557
broker_engine = sqlalchemy.create_engine(
36-
broker_settings.connection_string,
58+
connection_string,
3759
pool_timeout=broker_settings.pool_timeout,
3860
pool_recycle=broker_settings.pool_recycle,
3961
)

0 commit comments

Comments
 (0)