Skip to content

Commit

Permalink
Merge branch 'main' into fix-get-sort
Browse files Browse the repository at this point in the history
  • Loading branch information
jonhealy1 committed Nov 7, 2023
2 parents 6674fc2 + 8b12370 commit b5ec2b0
Show file tree
Hide file tree
Showing 11 changed files with 311 additions and 72 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/cicd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
services:

elasticsearch_8_svc:
image: docker.elastic.co/elasticsearch/elasticsearch:8.1.3
image: docker.elastic.co/elasticsearch/elasticsearch:8.10.4
env:
cluster.name: stac-cluster
node.name: es01
Expand Down
11 changes: 8 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,17 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
### Added

- Collection-level Assets to the CollectionSerializer [#148](https://github.com/stac-utils/stac-fastapi-elasticsearch/issues/148)

### Added

- Pagination for /collections - GET all collections - route [#164](https://github.com/stac-utils/stac-fastapi-elasticsearch/pull/164)
- Examples folder with example docker setup for running sfes from pip [#147](https://github.com/stac-utils/stac-fastapi-elasticsearch/pull/147)
- GET /search filter extension queries [#163](https://github.com/stac-utils/stac-fastapi-elasticsearch/pull/163)
- Added support for GET /search intersection queries [#158](https://github.com/stac-utils/stac-fastapi-elasticsearch/issues/158)

### Changed

- Update elasticsearch version from 8.1.3 to 8.10.4 in cicd, gh actions [#164](https://github.com/stac-utils/stac-fastapi-elasticsearch/pull/164)
- Updated core stac-fastapi libraries to 2.4.8 from 2.4.3 [#151](https://github.com/stac-utils/stac-fastapi-elasticsearch/pull/151)
- Use aliases on Elasticsearch indices, add number suffix in index name. [#152](https://github.com/stac-utils/stac-fastapi-elasticsearch/pull/152)

### Fixed

- Corrected the closing of client connections in ES index management functions [#132](https://github.com/stac-utils/stac-fastapi-elasticsearch/issues/132)
Expand Down
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,17 @@ curl -X "POST" "http://localhost:8080/collections" \
```

Note: this "Collections Transaction" behavior is not part of the STAC API, but may be soon.


## Collection pagination

The collections route handles optional `limit` and `token` parameters. The `links` field that is
returned from the `/collections` route contains a `next` link with the token that can be used to
get the next page of results.

```shell
curl -X "GET" "http://localhost:8080/collections?limit=1&token=example_token"
```

## Testing

Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ services:

elasticsearch:
container_name: es-container
image: docker.elastic.co/elasticsearch/elasticsearch:${ELASTICSEARCH_VERSION:-8.1.3}
image: docker.elastic.co/elasticsearch/elasticsearch:${ELASTICSEARCH_VERSION:-8.10.4}
environment:
ES_JAVA_OPTS: -Xms512m -Xmx1g
volumes:
Expand Down
1 change: 1 addition & 0 deletions stac_fastapi/elasticsearch/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"elasticsearch-dsl==7.4.1",
"pystac[validation]",
"uvicorn",
"orjson",
"overrides",
"starlette",
"geojson-pydantic",
Expand Down
117 changes: 77 additions & 40 deletions stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/core.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
"""Item crud client."""
import json
import logging
import re
from base64 import urlsafe_b64encode
from datetime import datetime as datetime_type
from datetime import timezone
from typing import Any, Dict, List, Optional, Set, Type, Union
from urllib.parse import urljoin
from urllib.parse import unquote_plus, urljoin

import attr
import orjson
import stac_pydantic
from fastapi import HTTPException
from fastapi import HTTPException, Request
from overrides import overrides
from pydantic import ValidationError
from pygeofilter.backends.cql2_json import to_cql2
from pygeofilter.parsers.cql2_text import parse as parse_cql2_text
from stac_pydantic.links import Relations
from stac_pydantic.shared import MimeTypes
from starlette.requests import Request

from stac_fastapi.elasticsearch import serializers
from stac_fastapi.elasticsearch.config import ElasticsearchSettings
Expand Down Expand Up @@ -80,30 +83,58 @@ async def all_collections(self, **kwargs) -> Collections:
Raises:
Exception: If any error occurs while reading the collections from the database.
"""
request: Request = kwargs["request"]
base_url = str(kwargs["request"].base_url)

limit = (
int(request.query_params["limit"])
if "limit" in request.query_params
else 10
)
token = (
request.query_params["token"] if "token" in request.query_params else None
)

hits = await self.database.get_all_collections(limit=limit, token=token)

next_search_after = None
next_link = None
if len(hits) == limit:
last_hit = hits[-1]
next_search_after = last_hit["sort"]
next_token = urlsafe_b64encode(
",".join(map(str, next_search_after)).encode()
).decode()
paging_links = PagingLinks(next=next_token, request=request)
next_link = paging_links.link_next()

links = [
{
"rel": Relations.root.value,
"type": MimeTypes.json,
"href": base_url,
},
{
"rel": Relations.parent.value,
"type": MimeTypes.json,
"href": base_url,
},
{
"rel": Relations.self.value,
"type": MimeTypes.json,
"href": urljoin(base_url, "collections"),
},
]

if next_link:
links.append(next_link)

return Collections(
collections=[
self.collection_serializer.db_to_stac(c, base_url=base_url)
for c in await self.database.get_all_collections()
],
links=[
{
"rel": Relations.root.value,
"type": MimeTypes.json,
"href": base_url,
},
{
"rel": Relations.parent.value,
"type": MimeTypes.json,
"href": base_url,
},
{
"rel": Relations.self.value,
"type": MimeTypes.json,
"href": urljoin(base_url, "collections"),
},
self.collection_serializer.db_to_stac(c["_source"], base_url=base_url)
for c in hits
],
links=links,
)

@overrides
Expand Down Expand Up @@ -274,9 +305,9 @@ def _return_date(interval_str):

return {"lte": end_date, "gte": start_date}

@overrides
async def get_search(
self,
request: Request,
collections: Optional[List[str]] = None,
ids: Optional[List[str]] = None,
bbox: Optional[List[NumType]] = None,
Expand All @@ -287,8 +318,8 @@ async def get_search(
fields: Optional[List[str]] = None,
sortby: Optional[str] = None,
intersects: Optional[str] = None,
# filter: Optional[str] = None, # todo: requires fastapi > 2.3 unreleased
# filter_lang: Optional[str] = None, # todo: requires fastapi > 2.3 unreleased
filter: Optional[str] = None,
filter_lang: Optional[str] = None,
**kwargs,
) -> ItemCollection:
"""Get search results from the database.
Expand Down Expand Up @@ -318,17 +349,24 @@ async def get_search(
"bbox": bbox,
"limit": limit,
"token": token,
"query": json.loads(query) if query else query,
"query": orjson.loads(query) if query else query,
}

# this is borrowed from stac-fastapi-pgstac
# Kludgy fix because using factory does not allow alias for filter-lan
query_params = str(request.query_params)
if filter_lang is None:
match = re.search(r"filter-lang=([a-z0-9-]+)", query_params, re.IGNORECASE)
if match:
filter_lang = match.group(1)

if datetime:
base_args["datetime"] = datetime

if intersects:
base_args["intersects"] = intersects
base_args["intersects"] = orjson.loads(unquote_plus(intersects))

if sortby:
# https://github.com/radiantearth/stac-spec/tree/master/api-spec/extensions/sort#http-get-or-post-form
sort_param = []
for sort in sortby:
sort_param.append(
Expand All @@ -340,12 +378,13 @@ async def get_search(
print(sort_param)
base_args["sortby"] = sort_param

# todo: requires fastapi > 2.3 unreleased
# if filter:
# if filter_lang == "cql2-text":
# base_args["filter-lang"] = "cql2-json"
# base_args["filter"] = orjson.loads(to_cql2(parse_cql2_text(filter)))
# print(f'>>> {base_args["filter"]}')
if filter:
if filter_lang == "cql2-text":
base_args["filter-lang"] = "cql2-json"
base_args["filter"] = orjson.loads(to_cql2(parse_cql2_text(filter)))
else:
base_args["filter-lang"] = "cql2-json"
base_args["filter"] = orjson.loads(unquote_plus(filter))

if fields:
includes = set()
Expand All @@ -364,13 +403,12 @@ async def get_search(
search_request = self.post_request_model(**base_args)
except ValidationError:
raise HTTPException(status_code=400, detail="Invalid parameters provided")
resp = await self.post_search(search_request, request=kwargs["request"])
resp = await self.post_search(search_request=search_request, request=request)

return resp

@overrides
async def post_search(
self, search_request: BaseSearchPostRequest, **kwargs
self, search_request: BaseSearchPostRequest, request: Request
) -> ItemCollection:
"""
Perform a POST search on the catalog.
Expand All @@ -385,7 +423,6 @@ async def post_search(
Raises:
HTTPException: If there is an error with the cql2_json filter.
"""
request: Request = kwargs["request"]
base_url = str(request.base_url)

search = self.database.make_search()
Expand Down Expand Up @@ -472,7 +509,7 @@ async def post_search(
filter_kwargs = search_request.fields.filter_fields

items = [
json.loads(stac_pydantic.Item(**feat).json(**filter_kwargs))
orjson.loads(stac_pydantic.Item(**feat).json(**filter_kwargs))
for feat in items
]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
":",
}

DEFAULT_INDICES = f"*,-*kibana*,-{COLLECTIONS_INDEX}"
ITEM_INDICES = f"{ITEMS_INDEX_PREFIX}*,-*kibana*,-{COLLECTIONS_INDEX}*"

DEFAULT_SORT = {
"properties.datetime": {"order": "desc"},
Expand Down Expand Up @@ -164,7 +164,7 @@ def indices(collection_ids: Optional[List[str]]) -> str:
A string of comma-separated index names. If `collection_ids` is None, returns the default indices.
"""
if collection_ids is None:
return DEFAULT_INDICES
return ITEM_INDICES
else:
return ",".join([index_by_collection_id(c) for c in collection_ids])

Expand All @@ -178,7 +178,8 @@ async def create_collection_index() -> None:
client = AsyncElasticsearchSettings().create_client

await client.indices.create(
index=COLLECTIONS_INDEX,
index=f"{COLLECTIONS_INDEX}-000001",
aliases={COLLECTIONS_INDEX: {}},
mappings=ES_COLLECTIONS_MAPPINGS,
ignore=400, # ignore 400 already exists code
)
Expand All @@ -197,9 +198,11 @@ async def create_item_index(collection_id: str):
"""
client = AsyncElasticsearchSettings().create_client
index_name = index_by_collection_id(collection_id)

await client.indices.create(
index=index_by_collection_id(collection_id),
index=f"{index_by_collection_id(collection_id)}-000001",
aliases={index_name: {}},
mappings=ES_ITEMS_MAPPINGS,
settings=ES_ITEMS_SETTINGS,
ignore=400, # ignore 400 already exists code
Expand All @@ -215,7 +218,14 @@ async def delete_item_index(collection_id: str):
"""
client = AsyncElasticsearchSettings().create_client

await client.indices.delete(index=index_by_collection_id(collection_id))
name = index_by_collection_id(collection_id)
resolved = await client.indices.resolve_index(name=name)
if "aliases" in resolved and resolved["aliases"]:
[alias] = resolved["aliases"]
await client.indices.delete_alias(index=alias["indices"], name=alias["name"])
await client.indices.delete(index=alias["indices"])
else:
await client.indices.delete(index=name)
await client.close()


Expand Down Expand Up @@ -295,21 +305,34 @@ class DatabaseLogic:

"""CORE LOGIC"""

async def get_all_collections(self) -> Iterable[Dict[str, Any]]:
async def get_all_collections(
self, token: Optional[str], limit: int
) -> Iterable[Dict[str, Any]]:
"""Retrieve a list of all collections from the database.
Args:
token (Optional[str]): The token used to return the next set of results.
limit (int): Number of results to return
Returns:
collections (Iterable[Dict[str, Any]]): A list of dictionaries containing the source data for each collection.
Notes:
The collections are retrieved from the Elasticsearch database using the `client.search` method,
with the `COLLECTIONS_INDEX` as the target index and `size=1000` to retrieve up to 1000 records.
with the `COLLECTIONS_INDEX` as the target index and `size=limit` to retrieve records.
The result is a generator of dictionaries containing the source data for each collection.
"""
# https://github.com/stac-utils/stac-fastapi-elasticsearch/issues/65
# collections should be paginated, but at least return more than the default 10 for now
collections = await self.client.search(index=COLLECTIONS_INDEX, size=1000)
return (c["_source"] for c in collections["hits"]["hits"])
search_after = None
if token:
search_after = urlsafe_b64decode(token.encode()).decode().split(",")
collections = await self.client.search(
index=COLLECTIONS_INDEX,
search_after=search_after,
size=limit,
sort={"id": {"order": "asc"}},
)
hits = collections["hits"]["hits"]
return hits

async def get_one_item(self, collection_id: str, item_id: str) -> Dict:
"""Retrieve a single item from the database.
Expand Down Expand Up @@ -773,14 +796,11 @@ async def bulk_async(
`mk_actions` function is called to generate a list of actions for the bulk insert. If `refresh` is set to True, the
index is refreshed after the bulk insert. The function does not return any value.
"""
await asyncio.get_event_loop().run_in_executor(
None,
lambda: helpers.bulk(
self.sync_client,
mk_actions(collection_id, processed_items),
refresh=refresh,
raise_on_error=False,
),
await helpers.async_bulk(
self.client,
mk_actions(collection_id, processed_items),
refresh=refresh,
raise_on_error=False,
)

def bulk_sync(
Expand Down Expand Up @@ -811,7 +831,7 @@ def bulk_sync(
async def delete_items(self) -> None:
"""Danger. this is only for tests."""
await self.client.delete_by_query(
index=DEFAULT_INDICES,
index=ITEM_INDICES,
body={"query": {"match_all": {}}},
wait_for_completion=True,
)
Expand Down
Loading

0 comments on commit b5ec2b0

Please sign in to comment.