Skip to content

Commit 2839a35

Browse files
authored
Merge pull request #248 from ecmwf-projects/COPDS-2651-delete-jobs
Add delete jobs endpoint
2 parents 6b16e78 + 7c99c30 commit 2839a35

File tree

6 files changed

+212
-131
lines changed

6 files changed

+212
-131
lines changed

cads_processing_api_service/constraints.py

Lines changed: 0 additions & 40 deletions
This file was deleted.

cads_processing_api_service/costing.py

Lines changed: 1 addition & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,8 @@
1919

2020
import cads_adaptors
2121
import cads_adaptors.exceptions
22-
import cads_catalogue
23-
import fastapi
2422

25-
from . import adaptors, db_utils, exceptions, models, utils
23+
from . import adaptors, exceptions, models
2624

2725
COST_THRESHOLDS = {"api": "max_costs", "ui": "max_costs_portal"}
2826

@@ -32,61 +30,6 @@ class RequestOrigin(str, enum.Enum):
3230
ui = "ui"
3331

3432

35-
@exceptions.exception_logger
36-
def estimate_cost(
37-
process_id: str = fastapi.Path(..., description="Process identifier."),
38-
request_origin: RequestOrigin = fastapi.Query("api", include_in_schema=False),
39-
mandatory_inputs: bool = fastapi.Query(False, include_in_schema=False),
40-
execution_content: models.Execute = fastapi.Body(...),
41-
portals: tuple[str] | None = fastapi.Depends(utils.get_portals),
42-
) -> models.RequestCost:
43-
"""
44-
Estimate the cost with the highest cost/limit ratio of the request.
45-
46-
Parameters
47-
----------
48-
process_id : str
49-
Process ID.
50-
execution_content : models.Execute
51-
Request content.
52-
53-
Returns
54-
-------
55-
models.RequestCost
56-
Info on the cost with the highest cost/limit ratio.
57-
"""
58-
request = execution_content.model_dump()
59-
table = cads_catalogue.database.Resource
60-
catalogue_sessionmaker = db_utils.get_catalogue_sessionmaker(
61-
db_utils.ConnectionMode.read
62-
)
63-
with catalogue_sessionmaker() as catalogue_session:
64-
dataset = utils.lookup_resource_by_id(
65-
resource_id=process_id,
66-
table=table,
67-
session=catalogue_session,
68-
portals=portals,
69-
)
70-
adaptor_properties = adaptors.get_adaptor_properties(dataset)
71-
costing_info = compute_costing(
72-
request.get("inputs", {}), adaptor_properties, request_origin
73-
)
74-
cost = compute_highest_cost_limit_ratio(costing_info)
75-
if costing_info.cost_bar_steps:
76-
cost.cost_bar_steps = costing_info.cost_bar_steps
77-
try:
78-
check_request_validity(
79-
request=request,
80-
request_origin=request_origin,
81-
mandatory_inputs=mandatory_inputs,
82-
adaptor_properties=adaptor_properties,
83-
)
84-
except exceptions.InvalidRequest as exc:
85-
cost.request_is_valid = False
86-
cost.invalid_reason = exc.detail
87-
return cost
88-
89-
9033
def check_request_validity(
9134
request: dict[str, Any],
9235
request_origin: RequestOrigin,
Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
"""Additional endpoints for the CADS Processing API Service."""
2+
3+
from typing import Any
4+
5+
import cads_adaptors
6+
import cads_adaptors.exceptions
7+
import cads_broker
8+
import cads_catalogue
9+
import fastapi
10+
import ogc_api_processes_fastapi.exceptions
11+
import structlog
12+
13+
from . import (
14+
adaptors,
15+
auth,
16+
config,
17+
costing,
18+
db_utils,
19+
exceptions,
20+
limits,
21+
models,
22+
translators,
23+
utils,
24+
)
25+
26+
SETTINGS = config.settings
27+
28+
29+
@exceptions.exception_logger
30+
def apply_constraints(
31+
process_id: str = fastapi.Path(..., description="Process identifier."),
32+
execution_content: models.Execute = fastapi.Body(...),
33+
portals: tuple[str] | None = fastapi.Depends(utils.get_portals),
34+
) -> dict[str, Any]:
35+
request = execution_content.model_dump()
36+
table = cads_catalogue.database.Resource
37+
catalogue_sessionmaker = db_utils.get_catalogue_sessionmaker(
38+
db_utils.ConnectionMode.read
39+
)
40+
with catalogue_sessionmaker() as catalogue_session:
41+
dataset = utils.lookup_resource_by_id(
42+
resource_id=process_id,
43+
table=table,
44+
session=catalogue_session,
45+
portals=portals,
46+
)
47+
adaptor: cads_adaptors.AbstractAdaptor = adaptors.instantiate_adaptor(dataset)
48+
try:
49+
constraints: dict[str, Any] = adaptor.apply_constraints(
50+
request.get("inputs", {})
51+
)
52+
except (
53+
cads_adaptors.exceptions.ParameterError,
54+
cads_adaptors.exceptions.InvalidRequest,
55+
) as exc:
56+
raise exceptions.InvalidParameter(detail=str(exc)) from exc
57+
58+
return constraints
59+
60+
61+
@exceptions.exception_logger
62+
def estimate_cost(
63+
process_id: str = fastapi.Path(..., description="Process identifier."),
64+
request_origin: costing.RequestOrigin = fastapi.Query(
65+
"api", include_in_schema=False
66+
),
67+
mandatory_inputs: bool = fastapi.Query(False, include_in_schema=False),
68+
execution_content: models.Execute = fastapi.Body(...),
69+
portals: tuple[str] | None = fastapi.Depends(utils.get_portals),
70+
) -> models.RequestCost:
71+
"""
72+
Estimate the cost with the highest cost/limit ratio of the request.
73+
74+
Parameters
75+
----------
76+
process_id : str
77+
Process ID.
78+
execution_content : models.Execute
79+
Request content.
80+
81+
Returns
82+
-------
83+
models.RequestCost
84+
Info on the cost with the highest cost/limit ratio.
85+
"""
86+
request = execution_content.model_dump()
87+
table = cads_catalogue.database.Resource
88+
catalogue_sessionmaker = db_utils.get_catalogue_sessionmaker(
89+
db_utils.ConnectionMode.read
90+
)
91+
with catalogue_sessionmaker() as catalogue_session:
92+
dataset = utils.lookup_resource_by_id(
93+
resource_id=process_id,
94+
table=table,
95+
session=catalogue_session,
96+
portals=portals,
97+
)
98+
adaptor_properties = adaptors.get_adaptor_properties(dataset)
99+
costing_info = costing.compute_costing(
100+
request.get("inputs", {}), adaptor_properties, request_origin
101+
)
102+
cost = costing.compute_highest_cost_limit_ratio(costing_info)
103+
if costing_info.cost_bar_steps:
104+
cost.cost_bar_steps = costing_info.cost_bar_steps
105+
try:
106+
costing.check_request_validity(
107+
request=request,
108+
request_origin=request_origin,
109+
mandatory_inputs=mandatory_inputs,
110+
adaptor_properties=adaptor_properties,
111+
)
112+
except exceptions.InvalidRequest as exc:
113+
cost.request_is_valid = False
114+
cost.invalid_reason = exc.detail
115+
return cost
116+
117+
118+
@exceptions.exception_logger
119+
def get_api_request(
120+
process_id: str = fastapi.Path(..., description="Process identifier."),
121+
request: dict[str, Any] = fastapi.Body(...),
122+
) -> dict[str, str]:
123+
"""Get CADS API request equivalent to the provided processing request.
124+
125+
Parameters
126+
----------
127+
process_id : str, optional
128+
Process identifier, by default fastapi.Path(...)
129+
request : dict[str, Any], optional
130+
Request, by default fastapi.Body(...)
131+
132+
Returns
133+
-------
134+
dict[str, str]
135+
CDS API request.
136+
"""
137+
api_request_template = SETTINGS.api_request_template
138+
api_request = translators.format_api_request(
139+
api_request_template, process_id, request
140+
)
141+
return {"api_request": api_request}
142+
143+
144+
@exceptions.exception_logger
145+
def delete_jobs(
146+
request: models.DeleteJobs = fastapi.Body(...),
147+
auth_info: models.AuthInfo = fastapi.Depends(auth.get_auth_info),
148+
) -> models.JobList:
149+
"""Delete jobs from the processing queue.
150+
151+
Parameters
152+
----------
153+
request : models.DeleteJobsRequest
154+
Request body containing job IDs to delete.
155+
156+
Returns
157+
-------
158+
models.JobList
159+
List of jobs that were successfully deleted.
160+
"""
161+
structlog.contextvars.bind_contextvars(user_uid=auth_info.user_uid)
162+
limits.check_rate_limits(
163+
SETTINGS.rate_limits.jobs.delete,
164+
auth_info,
165+
)
166+
job_ids = request.job_ids
167+
compute_sessionmaker = db_utils.get_compute_sessionmaker(
168+
mode=db_utils.ConnectionMode.write
169+
)
170+
jobs = []
171+
with compute_sessionmaker() as compute_session:
172+
for job_id in job_ids:
173+
try:
174+
job = utils.get_job_from_broker_db(
175+
job_id=job_id, session=compute_session
176+
)
177+
except ogc_api_processes_fastapi.exceptions.NoSuchJob:
178+
continue
179+
try:
180+
auth.verify_permission(auth_info.user_uid, job)
181+
except exceptions.PermissionDenied:
182+
continue
183+
job = cads_broker.database.set_dismissed_request(
184+
request_uid=job_id, session=compute_session
185+
)
186+
jobs.append(utils.make_status_info(job))
187+
job_list = models.JobList(
188+
jobs=jobs,
189+
)
190+
return job_list

cads_processing_api_service/main.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,9 @@
2929
from . import (
3030
clients,
3131
config,
32-
constraints,
33-
costing,
32+
endpoints,
3433
exceptions,
3534
middlewares,
36-
translators,
3735
)
3836

3937
SETTINGS = config.settings
@@ -80,24 +78,31 @@ async def lifespan(application: fastapi.FastAPI) -> AsyncGenerator[Any, None]:
8078
app.router.lifespan_context = lifespan
8179
app.router.add_api_route(
8280
"/processes/{process_id}/constraints",
83-
constraints.apply_constraints,
81+
endpoints.apply_constraints,
8482
description="Apply constraints to the submitted process execution.",
8583
methods=["POST"],
8684
)
8785
app.router.add_api_route(
8886
"/processes/{process_id}/costing",
89-
costing.estimate_cost,
87+
endpoints.estimate_cost,
9088
description="Estimate costs of the submitted process execution.",
9189
methods=["POST"],
9290
response_model_exclude_unset=True,
9391
)
9492
app.router.add_api_route(
9593
"/processes/{process_id}/api-request",
96-
translators.get_api_request,
94+
endpoints.get_api_request,
9795
description="Get API request equivalent to the submitted process execution json.",
9896
methods=["POST"],
9997
)
100-
98+
app.router.add_api_route(
99+
"/jobs/delete",
100+
endpoints.delete_jobs,
101+
description="Delete jobs by their identifiers.",
102+
methods=["POST"],
103+
response_model_exclude_unset=True,
104+
response_model_exclude_none=True,
105+
)
101106
app.router.add_api_route(
102107
"/metrics", starlette_exporter.handle_metrics, include_in_schema=False
103108
)

cads_processing_api_service/models.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,3 +94,11 @@ class RequestCost(pydantic.BaseModel):
9494

9595
class Execute(ogc_api_processes_fastapi.models.Execute):
9696
inputs: dict[str, Any] | None = None
97+
98+
99+
class DeleteJobs(pydantic.BaseModel):
100+
"""Request body for DELETE /jobs."""
101+
102+
job_ids: list[str] = pydantic.Field(
103+
..., description="Identifiers of the jobs to delete."
104+
)

0 commit comments

Comments
 (0)