Skip to content

Commit

Permalink
Pv/one index per collection (#97)
Browse files Browse the repository at this point in the history
* refactor to use one index per collection

* restrict queries involving collection to only those indices
  • Loading branch information
Phil Varner authored Apr 27, 2022
1 parent 5f9e145 commit 3be5e1d
Show file tree
Hide file tree
Showing 10 changed files with 286 additions and 224 deletions.
16 changes: 11 additions & 5 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,17 @@ repos:
# W503 black conflicts with "line break before operator" rule
# E203 black conflicts with "whitespace before ':'" rule
'--ignore=E501,W503,E203,C901' ]
# - repo: https://github.com/pre-commit/mirrors-mypy
# rev: v0.942
# hooks:
# - id: mypy
# args: [--no-strict-optional, --ignore-missing-imports]
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v0.942
hooks:
- id: mypy
exclude: /tests/
# --strict
args: [--no-strict-optional, --ignore-missing-imports, --implicit-reexport]
additional_dependencies: [
"types-attrs",
"types-requests"
]
- repo: https://github.com/PyCQA/pydocstyle
rev: 6.1.1
hooks:
Expand Down
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

- Default to Python 3.10
- Default to Elasticsearch 8.x
- Collection objects are now stored in `collections` index rather than `stac_collections` index
- Item objects are no longer stored in `stac_items`, but in indices per collection named `items_{collection_id}`

### Removed

Expand Down
8 changes: 4 additions & 4 deletions stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
CoreClient,
TransactionsClient,
)
from stac_fastapi.elasticsearch.database_logic import create_collection_index
from stac_fastapi.elasticsearch.extensions import QueryExtension
from stac_fastapi.elasticsearch.indexes import IndexesClient
from stac_fastapi.elasticsearch.session import Session
from stac_fastapi.extensions.core import ( # FieldsExtension,
ContextExtension,
Expand Down Expand Up @@ -44,11 +44,11 @@


@app.on_event("startup")
async def _startup_event():
await IndexesClient().create_indexes()
async def _startup_event() -> None:
await create_collection_index()


def run():
def run() -> None:
"""Run app from command line using uvicorn if available."""
try:
import uvicorn
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import os
from typing import Any, Dict, Set

from elasticsearch import AsyncElasticsearch, Elasticsearch
from elasticsearch import AsyncElasticsearch, Elasticsearch # type: ignore
from stac_fastapi.types.config import ApiSettings


Expand Down
68 changes: 40 additions & 28 deletions stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,31 +52,30 @@ class CoreClient(AsyncBaseCoreClient):
async def all_collections(self, **kwargs) -> Collections:
"""Read all collections from the database."""
base_url = str(kwargs["request"].base_url)
collection_list = await self.database.get_all_collections()
collection_list = [
self.collection_serializer.db_to_stac(c, base_url=base_url)
for c in collection_list
]

links = [
{
"rel": Relations.root.value,
"type": MimeTypes.json,
"href": base_url,
},
{
"rel": Relations.parent.value,
"type": MimeTypes.json,
"href": base_url,
},
{
"rel": Relations.self.value,
"type": MimeTypes.json,
"href": urljoin(base_url, "collections"),
},
]

return Collections(collections=collection_list, links=links)
return Collections(
collections=[
self.collection_serializer.db_to_stac(c, base_url=base_url)
for c in await self.database.get_all_collections()
],
links=[
{
"rel": Relations.root.value,
"type": MimeTypes.json,
"href": base_url,
},
{
"rel": Relations.parent.value,
"type": MimeTypes.json,
"href": base_url,
},
{
"rel": Relations.self.value,
"type": MimeTypes.json,
"href": urljoin(base_url, "collections"),
},
],
)

@overrides
async def get_collection(self, collection_id: str, **kwargs) -> Collection:
Expand All @@ -100,6 +99,8 @@ async def item_collection(
limit=limit,
token=token,
sort=None,
collection_ids=[collection_id],
ignore_unavailable=False,
)

items = [
Expand Down Expand Up @@ -276,6 +277,7 @@ async def post_search(
limit=limit,
token=search_request.token, # type: ignore
sort=sort,
collection_ids=search_request.collections,
)

items = [
Expand Down Expand Up @@ -341,8 +343,11 @@ async def create_item(self, item: stac_types.Item, **kwargs) -> stac_types.Item:
processed_items = [
bulk_client.preprocess_item(item, base_url) for item in item["features"] # type: ignore
]

# not a great way to get the collection_id-- should be part of the method signature
collection_id = processed_items[0]["collection"]
await self.database.bulk_async(
processed_items, refresh=kwargs.get("refresh", False)
collection_id, processed_items, refresh=kwargs.get("refresh", False)
)

return None # type: ignore
Expand All @@ -355,12 +360,14 @@ async def create_item(self, item: stac_types.Item, **kwargs) -> stac_types.Item:
async def update_item(self, item: stac_types.Item, **kwargs) -> stac_types.Item:
"""Update item."""
base_url = str(kwargs["request"].base_url)
collection_id = item["collection"]

now = datetime_type.now(timezone.utc).isoformat().replace("+00:00", "Z")
item["properties"]["updated"] = str(now)

await self.database.check_collection_exists(collection_id=item["collection"])
await self.database.check_collection_exists(collection_id)
# todo: index instead of delete and create
await self.delete_item(item_id=item["id"], collection_id=item["collection"])
await self.delete_item(item_id=item["id"], collection_id=collection_id)
await self.create_item(item=item, **kwargs)

return ItemSerializer.db_to_stac(item, base_url)
Expand Down Expand Up @@ -440,6 +447,11 @@ def bulk_item_insert(
self.preprocess_item(item, base_url) for item in items.items.values()
]

self.database.bulk_sync(processed_items, refresh=kwargs.get("refresh", False))
# not a great way to get the collection_id-- should be part of the method signature
collection_id = processed_items[0]["collection"]

self.database.bulk_sync(
collection_id, processed_items, refresh=kwargs.get("refresh", False)
)

return f"Successfully added {len(processed_items)} Items."
Loading

0 comments on commit 3be5e1d

Please sign in to comment.