Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Optionnally provide request HTTP headers to processors
Browse files Browse the repository at this point in the history
vprivat-ads committed Jan 30, 2025
1 parent 26263c8 commit 51c7adf
Showing 10 changed files with 117 additions and 63 deletions.
7 changes: 7 additions & 0 deletions pygeoapi-config.yml
Original file line number Diff line number Diff line change
@@ -56,6 +56,13 @@ server:
# output_dir: /tmp/
# ogc_schemas_location: /opt/schemas.opengis.net
admin: false # enable admin api
coverages: true # enable ogcapi-coverages
edr: true # enable ogcapi-edr
features: true # enable ogcapi-features
maps: true # enable ogcapi-maps
processes: true # enable ogcapi-processes
stac: true # enable ogcapi-stac
tiles: true # enable ogcapi-tiles

logging:
level: ERROR
44 changes: 31 additions & 13 deletions pygeoapi/api/__init__.py
Original file line number Diff line number Diff line change
@@ -122,7 +122,7 @@
DEFAULT_STORAGE_CRS = DEFAULT_CRS


def all_apis() -> dict:
def all_apis(server_cfg: dict = {}) -> dict:
"""
Return all supported API modules
@@ -131,18 +131,36 @@ def all_apis() -> dict:
:returns: `dict` of API provider type, API module
"""

from . import (coverages, environmental_data_retrieval, itemtypes, maps,
processes, tiles, stac)

return {
'coverage': coverages,
'edr': environmental_data_retrieval,
'itemtypes': itemtypes,
'map': maps,
'process': processes,
'tile': tiles,
'stac': stac
}
apis: dict = {}

from . import itemtypes
apis['itemtypes'] = itemtypes

if server_cfg.get('coverages', True):
from . import coverages
apis['coverage'] = coverages

if server_cfg.get('edr', True):
from . import environmental_data_retrieval
apis['edr'] = environmental_data_retrieval

if server_cfg.get('maps', True):
from . import maps
apis['map'] = maps

if server_cfg.get('processes', True):
from . import processes
apis['process'] = processes

if server_cfg.get('tiles', True):
from . import tiles
apis['tile'] = tiles

if server_cfg.get('stac', True):
from . import stac
apis['stac'] = stac

return apis


def apply_gzip(headers: dict, content: Union[str, bytes]) -> Union[str, bytes]:
3 changes: 2 additions & 1 deletion pygeoapi/api/processes.py
Original file line number Diff line number Diff line change
@@ -484,7 +484,8 @@ def execute_process(api: API, request: APIRequest,
process_id, data_dict, execution_mode=execution_mode,
requested_outputs=requested_outputs,
subscriber=subscriber,
requested_response=requested_response)
requested_response=requested_response,
request_headers=request.headers)
job_id, mime_type, outputs, status, additional_headers = result
headers.update(additional_headers or {})

86 changes: 45 additions & 41 deletions pygeoapi/openapi.py
Original file line number Diff line number Diff line change
@@ -443,6 +443,9 @@ def get_oas_30(cfg: dict, fail_on_invalid_collection: bool = True) -> dict:
items_f = deepcopy(oas['components']['parameters']['f'])
items_f['schema']['enum'].append('csv')

tiles_enabled = cfg['server'].get('tiles', True)
admin_enabled = cfg['server'].get('admin', False)

LOGGER.debug('setting up datasets')

for k, v in get_visible_collections(cfg).items():
@@ -484,58 +487,59 @@ def get_oas_30(cfg: dict, fail_on_invalid_collection: bool = True) -> dict:
}
}

oas['components']['responses'].update({
'Tiles': {
'description': 'Retrieves the tiles description for this collection', # noqa
'content': {
'application/json': {
'schema': {
'$ref': '#/components/schemas/tiles'
if tiles_enabled:
oas['components']['responses'].update({
'Tiles': {
'description': 'Retrieves the tiles description for this collection', # noqa
'content': {
'application/json': {
'schema': {
'$ref': '#/components/schemas/tiles'
}
}
}
}
}
}
)

oas['components']['schemas'].update({
'tilematrixsetlink': {
'type': 'object',
'required': ['tileMatrixSet'],
'properties': {
'tileMatrixSet': {
'type': 'string'
},
'tileMatrixSetURI': {
'type': 'string'
)

oas['components']['schemas'].update({
'tilematrixsetlink': {
'type': 'object',
'required': ['tileMatrixSet'],
'properties': {
'tileMatrixSet': {
'type': 'string'
},
'tileMatrixSetURI': {
'type': 'string'
}
}
}
},
'tiles': {
'type': 'object',
'required': [
'tileMatrixSetLinks',
'links'
],
'properties': {
'tileMatrixSetLinks': {
'type': 'array',
'items': {
'$ref': '#/components/schemas/tilematrixsetlink' # noqa
},
'tiles': {
'type': 'object',
'required': [
'tileMatrixSetLinks',
'links'
],
'properties': {
'tileMatrixSetLinks': {
'type': 'array',
'items': {
'$ref': '#/components/schemas/tilematrixsetlink' # noqa
}
},
'links': {
'type': 'array',
'items': {'$ref': f"{OPENAPI_YAML['oapit']}#/components/schemas/link"} # noqa
}
},
'links': {
'type': 'array',
'items': {'$ref': f"{OPENAPI_YAML['oapit']}#/components/schemas/link"} # noqa
}
}
}
}
)
)

oas['paths'] = paths

for api_name, api_module in all_apis().items():
for api_name, api_module in all_apis(cfg['server']).items():
LOGGER.debug(f'Adding OpenAPI definitions for {api_name}')

try:
@@ -548,7 +552,7 @@ def get_oas_30(cfg: dict, fail_on_invalid_collection: bool = True) -> dict:
else:
LOGGER.warning(f'Resource not added to OpenAPI: {err}')

if cfg['server'].get('admin', False):
if admin_enabled:
schema_dict = get_config_schema()
oas['definitions'] = schema_dict['definitions']
LOGGER.debug('Adding admin endpoints')
6 changes: 5 additions & 1 deletion pygeoapi/process/base.py
Original file line number Diff line number Diff line change
@@ -53,6 +53,7 @@ def __init__(self, processor_def: dict, process_metadata: dict):
self.name = processor_def['name']
self.metadata = process_metadata
self.supports_outputs = False
self.supports_request_headers = False

def set_job_id(self, job_id: str) -> None:
"""
@@ -70,7 +71,8 @@ def set_job_id(self, job_id: str) -> None:

pass

def execute(self, data: dict, outputs: Optional[dict] = None
def execute(self, data: dict, outputs: Optional[dict] = None,
request_headers: Optional[dict] = None
) -> Tuple[str, Any]:
"""
execute the process
@@ -81,6 +83,8 @@ def execute(self, data: dict, outputs: Optional[dict] = None
required outputs - defaults to all outputs.
The value of any key may be an object and include the
property `transmissionMode` - defaults to `value`.
:param request_headers: `dict` optionally specifying the headers from
the request
:returns: tuple of MIME type and process response
(string or bytes, or dict)
"""
26 changes: 20 additions & 6 deletions pygeoapi/process/manager/base.py
Original file line number Diff line number Diff line change
@@ -76,6 +76,7 @@ def __init__(self, manager_def: dict):
self.name = manager_def['name']
self.is_async = False
self.supports_subscribing = False
self.supports_request_headers = False
self.connection = manager_def.get('connection')
self.output_dir = manager_def.get('output_dir')

@@ -194,7 +195,8 @@ def _execute_handler_async(self, p: BaseProcessor, job_id: str,
data_dict: dict,
requested_outputs: Optional[dict] = None,
subscriber: Optional[Subscriber] = None,
requested_response: Optional[RequestedResponse] = RequestedResponse.raw.value # noqa
requested_response: Optional[RequestedResponse] = RequestedResponse.raw.value, # noqa
request_headers: Optional[dict] = None
) -> Tuple[str, None, JobStatus]:
"""
This private execution handler executes a process in a background
@@ -215,13 +217,15 @@ def _execute_handler_async(self, p: BaseProcessor, job_id: str,
:param subscriber: optional `Subscriber` specifying callback URLs
:param requested_response: `RequestedResponse` optionally specifying
raw or document (default is `raw`)
:param request_headers: `dict` optionally specifying the headers from
the request
:returns: tuple of None (i.e. initial response payload)
and JobStatus.accepted (i.e. initial job status)
"""

args = (p, job_id, data_dict, requested_outputs, subscriber,
requested_response)
requested_response, request_headers)

_process = dummy.Process(target=self._execute_handler_sync, args=args)
_process.start()
@@ -232,7 +236,8 @@ def _execute_handler_sync(self, p: BaseProcessor, job_id: str,
data_dict: dict,
requested_outputs: Optional[dict] = None,
subscriber: Optional[Subscriber] = None,
requested_response: Optional[RequestedResponse] = RequestedResponse.raw.value # noqa
requested_response: Optional[RequestedResponse] = RequestedResponse.raw.value, # noqa
request_headers: Optional[dict] = None
) -> Tuple[str, Any, JobStatus]:
"""
Synchronous execution handler
@@ -254,16 +259,20 @@ def _execute_handler_sync(self, p: BaseProcessor, job_id: str,
:param subscriber: optional `Subscriber` specifying callback URLs
:param requested_response: `RequestedResponse` optionally specifying
raw or document (default is `raw`)
:param request_headers: `dict` optionally specifying the headers from
the request
:returns: tuple of MIME type, response payload and status
"""

extra_execute_parameters = {}

# only pass requested_outputs if supported,
# only pass requested_outputs and request_headers if supported,
# otherwise this breaks existing processes
if p.supports_outputs:
extra_execute_parameters['outputs'] = requested_outputs
if p.supports_request_headers:
extra_execute_parameters['request_headers'] = request_headers

self._send_in_progress_notification(subscriber)

@@ -358,7 +367,8 @@ def execute_process(
execution_mode: Optional[RequestedProcessExecutionMode] = None,
requested_outputs: Optional[dict] = None,
subscriber: Optional[Subscriber] = None,
requested_response: Optional[RequestedResponse] = RequestedResponse.raw.value # noqa
requested_response: Optional[RequestedResponse] = RequestedResponse.raw.value, # noqa
request_headers: Optional[dict] = None
) -> Tuple[str, Any, JobStatus, Optional[Dict[str, str]]]:
"""
Default process execution handler
@@ -377,6 +387,8 @@ def execute_process(
:param subscriber: `Subscriber` optionally specifying callback urls
:param requested_response: `RequestedResponse` optionally specifying
raw or document (default is `raw`)
:param request_headers: `dict` optionally specifying the headers from
the request
:raises UnknownProcessError: if the input process_id does not
@@ -443,10 +455,12 @@ def execute_process(
}
self.add_job(job_metadata)

# only pass subscriber if supported, otherwise this breaks
# only pass subscriber and headers if supported, otherwise this breaks
# existing managers
if self.supports_subscribing:
extra_execute_handler_parameters['subscriber'] = subscriber
if self.supports_request_headers:
extra_execute_handler_parameters['request_headers'] = request_headers # noqa

# TODO: handler's response could also be allowed to include more HTTP
# headers
5 changes: 4 additions & 1 deletion pygeoapi/process/manager/dummy.py
Original file line number Diff line number Diff line change
@@ -79,7 +79,8 @@ def execute_process(
execution_mode: Optional[RequestedProcessExecutionMode] = None,
requested_outputs: Optional[dict] = None,
subscriber: Optional[Subscriber] = None,
requested_response: Optional[RequestedResponse] = RequestedResponse.raw.value # noqa
requested_response: Optional[RequestedResponse] = RequestedResponse.raw.value, # noqa
request_headers: Optional[dict] = None
) -> Tuple[str, str, Any, JobStatus, Optional[Dict[str, str]]]:
"""
Default process execution handler
@@ -95,6 +96,8 @@ def execute_process(
:param subscriber: `Subscriber` optionally specifying callback urls
:param requested_response: `RequestedResponse` optionally specifying
raw or document (default is `raw`)
:param request_headers: `dict` optionally specifying the headers from
the request
:raises UnknownProcessError: if the input process_id does not
correspond to a known process
1 change: 1 addition & 0 deletions pygeoapi/process/manager/mongodb_.py
Original file line number Diff line number Diff line change
@@ -47,6 +47,7 @@ def __init__(self, manager_def):
super().__init__(manager_def)
self.is_async = True
self.supports_subscribing = True
self.supports_request_headers = True

def _connect(self):
try:
1 change: 1 addition & 0 deletions pygeoapi/process/manager/postgresql.py
Original file line number Diff line number Diff line change
@@ -79,6 +79,7 @@ def __init__(self, manager_def: dict):
self.is_async = True
self.id_field = 'identifier'
self.supports_subscribing = True
self.supports_request_headers = True
self.connection = manager_def['connection']

try:
1 change: 1 addition & 0 deletions pygeoapi/process/manager/tinydb_.py
Original file line number Diff line number Diff line change
@@ -63,6 +63,7 @@ def __init__(self, manager_def: dict):
super().__init__(manager_def)
self.is_async = True
self.supports_subscribing = True
self.supports_request_headers = True

@contextmanager
def _db(self):

0 comments on commit 51c7adf

Please sign in to comment.