diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index c0d64cf9..35d893f9 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -17,11 +17,17 @@ repos: # W503 black conflicts with "line break before operator" rule # E203 black conflicts with "whitespace before ':'" rule '--ignore=E501,W503,E203,C901' ] -# - repo: https://github.com/pre-commit/mirrors-mypy -# rev: v0.942 -# hooks: -# - id: mypy -# args: [--no-strict-optional, --ignore-missing-imports] + - repo: https://github.com/pre-commit/mirrors-mypy + rev: v0.942 + hooks: + - id: mypy + exclude: /tests/ + # --strict + args: [--no-strict-optional, --ignore-missing-imports, --implicit-reexport] + additional_dependencies: [ + "types-attrs", + "types-requests" + ] - repo: https://github.com/PyCQA/pydocstyle rev: 6.1.1 hooks: diff --git a/CHANGELOG.md b/CHANGELOG.md index b0f380de..87b8da38 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. - Default to Python 3.10 - Default to Elasticsearch 8.x +- Collection objects are now stored in `collections` index rather than `stac_collections` index +- Item objects are no longer stored in `stac_items`, but in indices per collection named `items_{collection_id}` ### Removed diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/app.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/app.py index f4443941..99dec7cb 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/app.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/app.py @@ -7,8 +7,8 @@ CoreClient, TransactionsClient, ) +from stac_fastapi.elasticsearch.database_logic import create_collection_index from stac_fastapi.elasticsearch.extensions import QueryExtension -from stac_fastapi.elasticsearch.indexes import IndexesClient from stac_fastapi.elasticsearch.session import Session from stac_fastapi.extensions.core import ( # FieldsExtension, ContextExtension, @@ -44,11 +44,11 @@ @app.on_event("startup") -async def _startup_event(): - await IndexesClient().create_indexes() +async def _startup_event() -> None: + await create_collection_index() -def run(): +def run() -> None: """Run app from command line using uvicorn if available.""" try: import uvicorn diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config.py index 8c248f47..8fda8032 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config.py @@ -2,7 +2,7 @@ import os from typing import Any, Dict, Set -from elasticsearch import AsyncElasticsearch, Elasticsearch +from elasticsearch import AsyncElasticsearch, Elasticsearch # type: ignore from stac_fastapi.types.config import ApiSettings diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/core.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/core.py index b72efbbd..6b8c04cd 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/core.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/core.py @@ -52,31 +52,30 @@ class CoreClient(AsyncBaseCoreClient): async def all_collections(self, **kwargs) -> Collections: """Read all collections from the database.""" base_url = str(kwargs["request"].base_url) - collection_list = await self.database.get_all_collections() - collection_list = [ - self.collection_serializer.db_to_stac(c, base_url=base_url) - for c in collection_list - ] - - links = [ - { - "rel": Relations.root.value, - "type": MimeTypes.json, - "href": base_url, - }, - { - "rel": Relations.parent.value, - "type": MimeTypes.json, - "href": base_url, - }, - { - "rel": Relations.self.value, - "type": MimeTypes.json, - "href": urljoin(base_url, "collections"), - }, - ] - return Collections(collections=collection_list, links=links) + return Collections( + collections=[ + self.collection_serializer.db_to_stac(c, base_url=base_url) + for c in await self.database.get_all_collections() + ], + links=[ + { + "rel": Relations.root.value, + "type": MimeTypes.json, + "href": base_url, + }, + { + "rel": Relations.parent.value, + "type": MimeTypes.json, + "href": base_url, + }, + { + "rel": Relations.self.value, + "type": MimeTypes.json, + "href": urljoin(base_url, "collections"), + }, + ], + ) @overrides async def get_collection(self, collection_id: str, **kwargs) -> Collection: @@ -100,6 +99,8 @@ async def item_collection( limit=limit, token=token, sort=None, + collection_ids=[collection_id], + ignore_unavailable=False, ) items = [ @@ -276,6 +277,7 @@ async def post_search( limit=limit, token=search_request.token, # type: ignore sort=sort, + collection_ids=search_request.collections, ) items = [ @@ -341,8 +343,11 @@ async def create_item(self, item: stac_types.Item, **kwargs) -> stac_types.Item: processed_items = [ bulk_client.preprocess_item(item, base_url) for item in item["features"] # type: ignore ] + + # not a great way to get the collection_id-- should be part of the method signature + collection_id = processed_items[0]["collection"] await self.database.bulk_async( - processed_items, refresh=kwargs.get("refresh", False) + collection_id, processed_items, refresh=kwargs.get("refresh", False) ) return None # type: ignore @@ -355,12 +360,14 @@ async def create_item(self, item: stac_types.Item, **kwargs) -> stac_types.Item: async def update_item(self, item: stac_types.Item, **kwargs) -> stac_types.Item: """Update item.""" base_url = str(kwargs["request"].base_url) + collection_id = item["collection"] + now = datetime_type.now(timezone.utc).isoformat().replace("+00:00", "Z") item["properties"]["updated"] = str(now) - await self.database.check_collection_exists(collection_id=item["collection"]) + await self.database.check_collection_exists(collection_id) # todo: index instead of delete and create - await self.delete_item(item_id=item["id"], collection_id=item["collection"]) + await self.delete_item(item_id=item["id"], collection_id=collection_id) await self.create_item(item=item, **kwargs) return ItemSerializer.db_to_stac(item, base_url) @@ -440,6 +447,11 @@ def bulk_item_insert( self.preprocess_item(item, base_url) for item in items.items.values() ] - self.database.bulk_sync(processed_items, refresh=kwargs.get("refresh", False)) + # not a great way to get the collection_id-- should be part of the method signature + collection_id = processed_items[0]["collection"] + + self.database.bulk_sync( + collection_id, processed_items, refresh=kwargs.get("refresh", False) + ) return f"Successfully added {len(processed_items)} Items." diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py index 48d7cf54..54a86357 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py @@ -2,22 +2,12 @@ import asyncio import logging from base64 import urlsafe_b64decode, urlsafe_b64encode -from typing import Any, Dict, Iterable, List, Optional, Tuple, Type, Union +from typing import Any, Dict, Iterable, List, Optional, Protocol, Tuple, Type, Union import attr from elasticsearch_dsl import Q, Search -from geojson_pydantic.geometries import ( - GeometryCollection, - LineString, - MultiLineString, - MultiPoint, - MultiPolygon, - Point, - Polygon, -) -import elasticsearch -from elasticsearch import helpers +from elasticsearch import exceptions, helpers # type: ignore from stac_fastapi.elasticsearch import serializers from stac_fastapi.elasticsearch.config import AsyncElasticsearchSettings from stac_fastapi.elasticsearch.config import ( @@ -30,8 +20,10 @@ NumType = Union[float, int] -ITEMS_INDEX = "stac_items" -COLLECTIONS_INDEX = "stac_collections" +COLLECTIONS_INDEX = "collections" +ITEMS_INDEX_PREFIX = "items_" + +DEFAULT_INDICES = f"*,-*kibana*,-{COLLECTIONS_INDEX}" DEFAULT_SORT = { "properties.datetime": {"order": "desc"}, @@ -39,8 +31,139 @@ "collection": {"order": "desc"}, } +ES_ITEMS_SETTINGS = { + "index": { + "sort.field": list(DEFAULT_SORT.keys()), + "sort.order": [v["order"] for v in DEFAULT_SORT.values()], + } +} + +ES_MAPPINGS_DYNAMIC_TEMPLATES = [ + # Common https://github.com/radiantearth/stac-spec/blob/master/item-spec/common-metadata.md + { + "descriptions": { + "match_mapping_type": "string", + "match": "description", + "mapping": {"type": "text"}, + } + }, + { + "titles": { + "match_mapping_type": "string", + "match": "title", + "mapping": {"type": "text"}, + } + }, + # Projection Extension https://github.com/stac-extensions/projection + {"proj_epsg": {"match": "proj:epsg", "mapping": {"type": "integer"}}}, + { + "proj_projjson": { + "match": "proj:projjson", + "mapping": {"type": "object", "enabled": False}, + } + }, + { + "proj_centroid": { + "match": "proj:centroid", + "mapping": {"type": "geo_point"}, + } + }, + { + "proj_geometry": { + "match": "proj:geometry", + "mapping": {"type": "geo_shape"}, + } + }, + { + "no_index_href": { + "match": "href", + "mapping": {"type": "text", "index": False}, + } + }, + # Default all other strings not otherwise specified to keyword + {"strings": {"match_mapping_type": "string", "mapping": {"type": "keyword"}}}, + {"numerics": {"match_mapping_type": "long", "mapping": {"type": "float"}}}, +] + +ES_ITEMS_MAPPINGS = { + "numeric_detection": False, + "dynamic_templates": ES_MAPPINGS_DYNAMIC_TEMPLATES, + "properties": { + "id": {"type": "keyword"}, + "collection": {"type": "keyword"}, + "geometry": {"type": "geo_shape"}, + "assets": {"type": "object", "enabled": False}, + "links": {"type": "object", "enabled": False}, + "properties": { + "type": "object", + "properties": { + # Common https://github.com/radiantearth/stac-spec/blob/master/item-spec/common-metadata.md + "datetime": {"type": "date"}, + "start_datetime": {"type": "date"}, + "end_datetime": {"type": "date"}, + "created": {"type": "date"}, + "updated": {"type": "date"}, + # Satellite Extension https://github.com/stac-extensions/sat + "sat:absolute_orbit": {"type": "integer"}, + "sat:relative_orbit": {"type": "integer"}, + }, + }, + }, +} + +ES_COLLECTIONS_MAPPINGS = { + "numeric_detection": False, + "dynamic_templates": ES_MAPPINGS_DYNAMIC_TEMPLATES, + "properties": { + "extent.spatial.bbox": {"type": "long"}, + "extent.temporal.interval": {"type": "date"}, + "providers": {"type": "object", "enabled": False}, + "links": {"type": "object", "enabled": False}, + "item_assets": {"type": "object", "enabled": False}, + }, +} + + +def index_by_collection_id(collection_id: str) -> str: + """Translate a collection id into an ES index name.""" + return f"{ITEMS_INDEX_PREFIX}{collection_id}" + + +def indices(collection_ids: Optional[List[str]]) -> str: + """Get a comma-separated string value of indexes for a given list of collection ids.""" + if collection_ids is None: + return DEFAULT_INDICES + else: + return ",".join([f"{ITEMS_INDEX_PREFIX}{c.strip()}" for c in collection_ids]) + + +async def create_collection_index() -> None: + """Create the index for Items and Collections.""" + await AsyncElasticsearchSettings().create_client.indices.create( + index=COLLECTIONS_INDEX, + mappings=ES_COLLECTIONS_MAPPINGS, + ignore=400, # ignore 400 already exists code + ) + + +async def create_item_index(collection_id: str): + """Create the index for Items and Collections.""" + await AsyncElasticsearchSettings().create_client.indices.create( + index=index_by_collection_id(collection_id), + mappings=ES_ITEMS_MAPPINGS, + settings=ES_ITEMS_SETTINGS, + ignore=400, # ignore 400 already exists code + ) + + +async def delete_item_index(collection_id: str): + """Create the index for Items and Collections.""" + await AsyncElasticsearchSettings().create_client.indices.delete( + index=index_by_collection_id(collection_id) + ) + -def bbox2polygon(b0, b1, b2, b3): +def bbox2polygon(b0: float, b1: float, b2: float, b3: float) -> List[List[List[float]]]: """Transform bbox to polygon.""" return [[[b0, b1], [b2, b1], [b2, b3], [b0, b3], [b0, b1]]] @@ -50,6 +173,26 @@ def mk_item_id(item_id: str, collection_id: str): return f"{item_id}|{collection_id}" +def mk_actions(collection_id: str, processed_items: List[Item]): + """Make the Elasticsearch bulk action for a list of Items.""" + return [ + { + "_index": index_by_collection_id(collection_id), + "_id": mk_item_id(item["id"], item["collection"]), + "_source": item, + } + for item in processed_items + ] + + +# stac_pydantic classes extend _GeometryBase, which doesn't have a type field, +# So create our own Protocol for typing +# Union[ Point, MultiPoint, LineString, MultiLineString, Polygon, MultiPolygon, GeometryCollection] +class Geometry(Protocol): # noqa + type: str + coordinates: Any + + @attr.s class DatabaseLogic: """Database logic.""" @@ -66,7 +209,7 @@ class DatabaseLogic: """CORE LOGIC""" - async def get_all_collections(self) -> List[Dict[str, Any]]: + async def get_all_collections(self) -> Iterable[Dict[str, Any]]: """Database logic to retrieve a list of all collections.""" # https://github.com/stac-utils/stac-fastapi-elasticsearch/issues/65 # collections should be paginated, but at least return more than the default 10 for now @@ -77,9 +220,10 @@ async def get_one_item(self, collection_id: str, item_id: str) -> Dict: """Database logic to retrieve a single item.""" try: item = await self.client.get( - index=ITEMS_INDEX, id=mk_item_id(item_id, collection_id) + index=index_by_collection_id(collection_id), + id=mk_item_id(item_id, collection_id), ) - except elasticsearch.exceptions.NotFoundError: + except exceptions.NotFoundError: raise NotFoundError( f"Item {item_id} does not exist in Collection {collection_id}" ) @@ -138,15 +282,7 @@ def apply_bbox_filter(search: Search, bbox: List): @staticmethod def apply_intersects_filter( search: Search, - intersects: Union[ - Point, - MultiPoint, - LineString, - MultiLineString, - Polygon, - MultiPolygon, - GeometryCollection, - ], + intersects: Geometry, ): """Database logic to search a geojson object.""" return search.filter( @@ -190,6 +326,8 @@ async def execute_search( limit: int, token: Optional[str], sort: Optional[Dict[str, Dict[str, str]]], + collection_ids: Optional[List[str]], + ignore_unavailable: bool = True, ) -> Tuple[Iterable[Dict[str, Any]], Optional[int], Optional[str]]: """Database logic to execute search with limit.""" search_after = None @@ -198,9 +336,12 @@ async def execute_search( query = search.query.to_dict() if search.query else None + index_param = indices(collection_ids) + search_task = asyncio.create_task( self.client.search( - index=ITEMS_INDEX, + index=index_param, + ignore_unavailable=ignore_unavailable, query=query, sort=sort or DEFAULT_SORT, search_after=search_after, @@ -209,10 +350,17 @@ async def execute_search( ) count_task = asyncio.create_task( - self.client.count(index=ITEMS_INDEX, body=search.to_dict(count=True)) + self.client.count( + index=index_param, + ignore_unavailable=ignore_unavailable, + body=search.to_dict(count=True), + ) ) - es_response = await search_task + try: + es_response = await search_task + except exceptions.NotFoundError: + raise NotFoundError(f"Collections '{collection_ids}' do not exist") hits = es_response["hits"]["hits"] items = (hit["_source"] for hit in hits) @@ -229,7 +377,7 @@ async def execute_search( if count_task.done(): try: maybe_count = count_task.result().get("count") - except Exception as e: # type: ignore + except Exception as e: logger.error(f"Count task failed: {e}") return items, maybe_count, next_token @@ -246,7 +394,8 @@ async def prep_create_item(self, item: Item, base_url: str) -> Item: await self.check_collection_exists(collection_id=item["collection"]) if await self.client.exists( - index=ITEMS_INDEX, id=mk_item_id(item["id"], item["collection"]) + index=index_by_collection_id(item["collection"]), + id=mk_item_id(item["id"], item["collection"]), ): raise ConflictError( f"Item {item['id']} in collection {item['collection']} already exists" @@ -256,15 +405,17 @@ async def prep_create_item(self, item: Item, base_url: str) -> Item: def sync_prep_create_item(self, item: Item, base_url: str) -> Item: """Database logic for prepping an item for insertion.""" + item_id = item["id"] collection_id = item["collection"] if not self.sync_client.exists(index=COLLECTIONS_INDEX, id=collection_id): raise NotFoundError(f"Collection {collection_id} does not exist") if self.sync_client.exists( - index=ITEMS_INDEX, id=mk_item_id(item["id"], item["collection"]) + index=index_by_collection_id(collection_id), + id=mk_item_id(item_id, collection_id), ): raise ConflictError( - f"Item {item['id']} in collection {item['collection']} already exists" + f"Item {item_id} in collection {collection_id} already exists" ) return self.item_serializer.stac_to_db(item, base_url) @@ -272,16 +423,18 @@ def sync_prep_create_item(self, item: Item, base_url: str) -> Item: async def create_item(self, item: Item, refresh: bool = False): """Database logic for creating one item.""" # todo: check if collection exists, but cache + item_id = item["id"] + collection_id = item["collection"] es_resp = await self.client.index( - index=ITEMS_INDEX, - id=mk_item_id(item["id"], item["collection"]), + index=index_by_collection_id(collection_id), + id=mk_item_id(item_id, collection_id), document=item, refresh=refresh, ) if (meta := es_resp.get("meta")) and meta.get("status") == 409: raise ConflictError( - f"Item {item['id']} in collection {item['collection']} already exists" + f"Item {item_id} in collection {collection_id} already exists" ) async def delete_item( @@ -290,34 +443,38 @@ async def delete_item( """Database logic for deleting one item.""" try: await self.client.delete( - index=ITEMS_INDEX, + index=index_by_collection_id(collection_id), id=mk_item_id(item_id, collection_id), refresh=refresh, ) - except elasticsearch.exceptions.NotFoundError: + except exceptions.NotFoundError: raise NotFoundError( f"Item {item_id} in collection {collection_id} not found" ) async def create_collection(self, collection: Collection, refresh: bool = False): """Database logic for creating one collection.""" - if await self.client.exists(index=COLLECTIONS_INDEX, id=collection["id"]): - raise ConflictError(f"Collection {collection['id']} already exists") + collection_id = collection["id"] + + if await self.client.exists(index=COLLECTIONS_INDEX, id=collection_id): + raise ConflictError(f"Collection {collection_id} already exists") await self.client.index( index=COLLECTIONS_INDEX, - id=collection["id"], + id=collection_id, document=collection, refresh=refresh, ) + await create_item_index(collection_id) + async def find_collection(self, collection_id: str) -> Collection: """Database logic to find and return a collection.""" try: collection = await self.client.get( index=COLLECTIONS_INDEX, id=collection_id ) - except elasticsearch.exceptions.NotFoundError: + except exceptions.NotFoundError: raise NotFoundError(f"Collection {collection_id} not found") return collection["_source"] @@ -328,38 +485,36 @@ async def delete_collection(self, collection_id: str, refresh: bool = False): await self.client.delete( index=COLLECTIONS_INDEX, id=collection_id, refresh=refresh ) + await delete_item_index(collection_id) - async def bulk_async(self, processed_items, refresh: bool = False): + async def bulk_async( + self, collection_id: str, processed_items: List[Item], refresh: bool = False + ) -> None: """Database logic for async bulk item insertion.""" await asyncio.get_event_loop().run_in_executor( None, lambda: helpers.bulk( - self.sync_client, self._mk_actions(processed_items), refresh=refresh + self.sync_client, + mk_actions(collection_id, processed_items), + refresh=refresh, ), ) - def bulk_sync(self, processed_items, refresh: bool = False): + def bulk_sync( + self, collection_id: str, processed_items: List[Item], refresh: bool = False + ) -> None: """Database logic for sync bulk item insertion.""" helpers.bulk( - self.sync_client, self._mk_actions(processed_items), refresh=refresh + self.sync_client, + mk_actions(collection_id, processed_items), + refresh=refresh, ) - @staticmethod - def _mk_actions(processed_items): - return [ - { - "_index": ITEMS_INDEX, - "_id": mk_item_id(item["id"], item["collection"]), - "_source": item, - } - for item in processed_items - ] - # DANGER async def delete_items(self) -> None: """Danger. this is only for tests.""" await self.client.delete_by_query( - index=ITEMS_INDEX, + index=DEFAULT_INDICES, body={"query": {"match_all": {}}}, wait_for_completion=True, ) diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/indexes.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/indexes.py deleted file mode 100644 index b02d2129..00000000 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/indexes.py +++ /dev/null @@ -1,115 +0,0 @@ -"""index management client.""" - -import logging - -import attr - -from stac_fastapi.elasticsearch.config import AsyncElasticsearchSettings -from stac_fastapi.elasticsearch.database_logic import COLLECTIONS_INDEX, ITEMS_INDEX -from stac_fastapi.elasticsearch.session import Session - -logger = logging.getLogger(__name__) - - -@attr.s -class IndexesClient: - """Elasticsearch client to handle index creation.""" - - session: Session = attr.ib(default=attr.Factory(Session.create_from_env)) - - ES_MAPPINGS_DYNAMIC_TEMPLATES = [ - # Common https://github.com/radiantearth/stac-spec/blob/master/item-spec/common-metadata.md - { - "descriptions": { - "match_mapping_type": "string", - "match": "description", - "mapping": {"type": "text"}, - } - }, - { - "titles": { - "match_mapping_type": "string", - "match": "title", - "mapping": {"type": "text"}, - } - }, - # Projection Extension https://github.com/stac-extensions/projection - {"proj_epsg": {"match": "proj:epsg", "mapping": {"type": "integer"}}}, - { - "proj_projjson": { - "match": "proj:projjson", - "mapping": {"type": "object", "enabled": False}, - } - }, - { - "proj_centroid": { - "match": "proj:centroid", - "mapping": {"type": "geo_point"}, - } - }, - { - "proj_geometry": { - "match": "proj:geometry", - "mapping": {"type": "geo_shape"}, - } - }, - { - "no_index_href": { - "match": "href", - "mapping": {"type": "text", "index": False}, - } - }, - # Default all other strings not otherwise specified to keyword - {"strings": {"match_mapping_type": "string", "mapping": {"type": "keyword"}}}, - {"numerics": {"match_mapping_type": "long", "mapping": {"type": "float"}}}, - ] - - ES_ITEMS_MAPPINGS = { - "numeric_detection": False, - "dynamic_templates": ES_MAPPINGS_DYNAMIC_TEMPLATES, - "properties": { - "geometry": {"type": "geo_shape"}, - "assets": {"type": "object", "enabled": False}, - "links": {"type": "object", "enabled": False}, - "properties": { - "type": "object", - "properties": { - # Common https://github.com/radiantearth/stac-spec/blob/master/item-spec/common-metadata.md - "datetime": {"type": "date"}, - "start_datetime": {"type": "date"}, - "end_datetime": {"type": "date"}, - "created": {"type": "date"}, - "updated": {"type": "date"}, - # Satellite Extension https://github.com/stac-extensions/sat - "sat:absolute_orbit": {"type": "integer"}, - "sat:relative_orbit": {"type": "integer"}, - }, - }, - }, - } - - ES_COLLECTIONS_MAPPINGS = { - "numeric_detection": False, - "dynamic_templates": ES_MAPPINGS_DYNAMIC_TEMPLATES, - "properties": { - "extent.spatial.bbox": {"type": "long"}, - "extent.temporal.interval": {"type": "date"}, - "providers": {"type": "object", "enabled": False}, - "links": {"type": "object", "enabled": False}, - "item_assets": {"type": "object", "enabled": False}, - }, - } - - async def create_indexes(self): - """Create the index for Items and Collections.""" - client = AsyncElasticsearchSettings().create_client - await client.indices.create( - index=ITEMS_INDEX, - mappings=self.ES_ITEMS_MAPPINGS, - ignore=400, # ignore 400 already exists code - ) - await client.indices.create( - index=COLLECTIONS_INDEX, - mappings=self.ES_COLLECTIONS_MAPPINGS, - ignore=400, # ignore 400 already exists code - ) diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/serializers.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/serializers.py index f196ff7e..4f853a79 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/serializers.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/serializers.py @@ -1,6 +1,6 @@ """Serializers.""" import abc -from typing import TypedDict +from typing import Any import attr @@ -15,7 +15,7 @@ class Serializer(abc.ABC): @classmethod @abc.abstractmethod - def db_to_stac(cls, item: dict, base_url: str) -> TypedDict: + def db_to_stac(cls, item: dict, base_url: str) -> Any: """Transform database model to stac.""" ... diff --git a/stac_fastapi/elasticsearch/tests/conftest.py b/stac_fastapi/elasticsearch/tests/conftest.py index d4621b90..b0028ec9 100644 --- a/stac_fastapi/elasticsearch/tests/conftest.py +++ b/stac_fastapi/elasticsearch/tests/conftest.py @@ -16,9 +16,8 @@ CoreClient, TransactionsClient, ) -from stac_fastapi.elasticsearch.database_logic import COLLECTIONS_INDEX, ITEMS_INDEX +from stac_fastapi.elasticsearch.database_logic import create_collection_index from stac_fastapi.elasticsearch.extensions import QueryExtension -from stac_fastapi.elasticsearch.indexes import IndexesClient from stac_fastapi.extensions.core import ( # FieldsExtension, ContextExtension, SortExtension, @@ -104,12 +103,7 @@ async def delete_collections_and_items(txn_client: TransactionsClient) -> None: async def refresh_indices(txn_client: TransactionsClient) -> None: try: - await txn_client.database.client.indices.refresh(index=ITEMS_INDEX) - except Exception: - pass - - try: - await txn_client.database.client.indices.refresh(index=COLLECTIONS_INDEX) + await txn_client.database.client.indices.refresh(index="_all") except Exception: pass @@ -185,7 +179,7 @@ async def app(): @pytest_asyncio.fixture(scope="session") async def app_client(app): - await IndexesClient().create_indexes() + await create_collection_index() async with AsyncClient(app=app, base_url="http://test-server") as c: yield c diff --git a/stac_fastapi/elasticsearch/tests/resources/test_item.py b/stac_fastapi/elasticsearch/tests/resources/test_item.py index 8f7ea0c4..bb4c1835 100644 --- a/stac_fastapi/elasticsearch/tests/resources/test_item.py +++ b/stac_fastapi/elasticsearch/tests/resources/test_item.py @@ -391,6 +391,14 @@ async def test_item_search_get_without_collections(app_client, ctx): assert resp.status_code == 200 +async def test_item_search_get_with_non_existent_collections(app_client, ctx): + """Test GET search with non-existent collections""" + + params = {"collections": "non-existent-collection,or-this-one"} + resp = await app_client.get("/search", params=params) + assert resp.status_code == 200 + + async def test_item_search_temporal_window_get(app_client, ctx): """Test GET search with spatio-temporal query (core)""" test_item = ctx.item @@ -491,7 +499,7 @@ async def test_item_search_get_query_extension(app_client, ctx): async def test_get_missing_item_collection(app_client): """Test reading a collection which does not exist""" resp = await app_client.get("/collections/invalid-collection/items") - assert resp.status_code == 200 + assert resp.status_code == 404 async def test_pagination_item_collection(app_client, ctx, txn_client):