Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Serverless ES support (a working draft) #271

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,25 @@
### To install from PyPI:

```shell
pip install stac_fastapi.elasticsearch
pip install stac_fastapi.elasticsearch[server, es]

```
if you are using serverless by elasticsearch

```shell
pip install stac_fastapi.elasticsearch[server, serverless_es]

```
or
```
pip install stac_fastapi.opensearch
```

from sources:
```
RUN pip install --no-cache-dir -e ./stac_fastapi/elasticsearch[dev,server,serverless_es]

```
## Build Elasticsearch API backend

```shell
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ services:
restart: always
build:
context: .
dockerfile: dockerfiles/Dockerfile.dev.es
dockerfile: dockerfiles/Dockerfile.dev.serverless.es
environment:
- STAC_FASTAPI_TITLE=stac-fastapi-elasticsearch
- STAC_FASTAPI_DESCRIPTION=A STAC FastAPI with an Elasticsearch backend
Expand Down
2 changes: 1 addition & 1 deletion dockerfiles/Dockerfile.dev.es
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ WORKDIR /app
COPY . /app

RUN pip install --no-cache-dir -e ./stac_fastapi/core
RUN pip install --no-cache-dir -e ./stac_fastapi/elasticsearch[dev,server]
RUN pip install --no-cache-dir -e ./stac_fastapi/elasticsearch[dev,server,es]
19 changes: 19 additions & 0 deletions dockerfiles/Dockerfile.dev.serverless.es
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
FROM python:3.10-slim


# update apt pkgs, and install build-essential for ciso8601
RUN apt-get update && \
apt-get -y upgrade && \
apt-get install -y build-essential git && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*

# update certs used by Requests
ENV CURL_CA_BUNDLE=/etc/ssl/certs/ca-certificates.crt

WORKDIR /app

COPY . /app

RUN pip install --no-cache-dir -e ./stac_fastapi/core
RUN pip install --no-cache-dir -e ./stac_fastapi/elasticsearch[dev,server,serverless_es]
16 changes: 14 additions & 2 deletions stac_fastapi/elasticsearch/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,21 @@

install_requires = [
"stac-fastapi.core==3.0.0",
"elasticsearch[async]==8.11.0",
"elasticsearch-dsl==8.11.0",
"uvicorn",
"starlette",
]

install_requires_es = [
"elasticsearch[async]==8.11.0",
"elasticsearch-dsl==8.11.0",
]

install_requires_serverless_es = [
"elasticsearch_serverless[async]",
"elasticsearch-dsl",
"requests",
]

extra_reqs = {
"dev": [
"pytest",
Expand All @@ -22,9 +31,12 @@
"requests",
"ciso8601",
"httpx",
"debugpy",
],
"docs": ["mkdocs", "mkdocs-material", "pdocs"],
"server": ["uvicorn[standard]==0.19.0"],
"es": install_requires_es,
"serverless_es": install_requires_serverless_es,
}

setup(
Expand Down
53 changes: 51 additions & 2 deletions stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,44 @@
from typing import Any, Dict, Set

import certifi
import requests

# RIGHT https://elasticsearch-py.readthedocs.io/en/latest/async.html
# https://elasticsearch-serverless-python.readthedocs.io/en/stable/api.html#module-elasticsearch_serverless
# from elasticsearch import AsyncElasticsearch, Elasticsearch # type: ignore
from elasticsearch_serverless import AsyncElasticsearch, Elasticsearch

from elasticsearch import AsyncElasticsearch, Elasticsearch # type: ignore
from stac_fastapi.types.config import ApiSettings

# WRONG https://elasticsearch-serverless-python.readthedocs.io/en/latest/api.html#elasticsearch_serverless.client.AsyncSearchClient
# from elasticsearch_serverless.client import AsyncSearchClient


def check_serverless_elasticsearch():
"""Check serverless availability."""
use_ssl = os.getenv("ES_USE_SSL", "true").lower() == "true"
scheme = "https" if use_ssl else "http"

# Configure the hosts parameter with the correct scheme
host = f"{scheme}://{os.getenv('ES_HOST')}:{os.getenv('ES_PORT')}"

headers = {"Authorization": f"ApiKey {os.getenv('ES_API_KEY')}"}
response = requests.get(host, headers=headers)
if response.ok:
data = response.json()
# Look for specific serverless indicators in the response
if "version" in data and "serverless" == data["version"].get(
"build_flavor", ""
):
return True, "Serverless Elasticsearch found"
else:
return False, "No serverless indicator found"
else:
return False, "Error accessing Elasticsearch endpoint"


serverless, message = check_serverless_elasticsearch()


def _es_config() -> Dict[str, Any]:
# Determine the scheme (http or https)
Expand All @@ -19,9 +53,24 @@ def _es_config() -> Dict[str, Any]:
hosts = [f"{scheme}://{os.getenv('ES_HOST')}:{os.getenv('ES_PORT')}"]

# Initialize the configuration dictionary
accept = None
if serverless:
accept = "application/vnd.elasticsearch+json; compatible-with=8"
else:
accept = "application/vnd.elasticsearch+json; compatible-with=7"

headers = {"accept": accept}

# Handle API key
api_key = os.getenv("ES_API_KEY")
if serverless:
headers.update({"Authorization": f"ApiKey {api_key}"})
else:
headers.update({"x-api-key": api_key})

config = {
"hosts": hosts,
"headers": {"accept": "application/vnd.elasticsearch+json; compatible-with=7"},
"headers": headers,
}

# Handle API key
Expand Down
162 changes: 114 additions & 48 deletions stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@
from stac_fastapi.elasticsearch.config import (
ElasticsearchSettings as SyncElasticsearchSettings,
)
from stac_fastapi.elasticsearch.config import check_serverless_elasticsearch
from stac_fastapi.types.errors import ConflictError, NotFoundError
from stac_fastapi.types.stac import Collection, Item

logger = logging.getLogger(__name__)

NumType = Union[float, int]

serverless, message = check_serverless_elasticsearch()
logger.info(message)

COLLECTIONS_INDEX = os.getenv("STAC_COLLECTIONS_INDEX", "collections")
ITEMS_INDEX_PREFIX = os.getenv("STAC_ITEMS_INDEX_PREFIX", "items_")
ES_INDEX_NAME_UNSUPPORTED_CHARS = {
Expand Down Expand Up @@ -180,23 +184,49 @@ async def create_index_templates() -> None:
None

"""
client = AsyncElasticsearchSettings().create_client
await client.indices.put_template(
name=f"template_{COLLECTIONS_INDEX}",
body={
"index_patterns": [f"{COLLECTIONS_INDEX}*"],
"mappings": ES_COLLECTIONS_MAPPINGS,
},
)
await client.indices.put_template(
name=f"template_{ITEMS_INDEX_PREFIX}",
body={
"index_patterns": [f"{ITEMS_INDEX_PREFIX}*"],
"settings": ES_ITEMS_SETTINGS,
"mappings": ES_ITEMS_MAPPINGS,
},
)
await client.close()
# https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-component-template.html
# no asynch is provided for index management over serverless
# the body also changes and the name can't start with underscode '_'
if serverless:
client = SyncElasticsearchSettings().create_client
client.indices.put_index_template(
name=f"template_{COLLECTIONS_INDEX}",
body={
"index_patterns": [f"{COLLECTIONS_INDEX}*"],
"template": {
"mappings": ES_COLLECTIONS_MAPPINGS,
},
},
)
client.indices.put_index_template(
name=f"index_template_{ITEMS_INDEX_PREFIX}",
body={
"index_patterns": [f"{ITEMS_INDEX_PREFIX}*"],
"template": {
"settings": ES_ITEMS_SETTINGS,
"mappings": ES_ITEMS_MAPPINGS,
},
},
)
client.close()
else:
client = AsyncElasticsearchSettings().create_client
await client.indices.put_template(
name=f"template_{COLLECTIONS_INDEX}",
body={
"index_patterns": [f"{COLLECTIONS_INDEX}*"],
"mappings": ES_COLLECTIONS_MAPPINGS,
},
)
await client.indices.put_template(
name=f"template_{ITEMS_INDEX_PREFIX}",
body={
"index_patterns": [f"{ITEMS_INDEX_PREFIX}*"],
"settings": ES_ITEMS_SETTINGS,
"mappings": ES_ITEMS_MAPPINGS,
},
)
await client.close()


async def create_collection_index() -> None:
Expand All @@ -207,13 +237,23 @@ async def create_collection_index() -> None:
None

"""
client = AsyncElasticsearchSettings().create_client
index = f"{COLLECTIONS_INDEX}-000001"
if serverless:
client = SyncElasticsearchSettings().create_client

await client.options(ignore_status=400).indices.create(
index=f"{COLLECTIONS_INDEX}-000001",
aliases={COLLECTIONS_INDEX: {}},
)
await client.close()
client.options(ignore_status=400).indices.create(
index=index,
aliases={COLLECTIONS_INDEX: {}},
)
client.close()
else:
client = AsyncElasticsearchSettings().create_client

await client.options(ignore_status=400).indices.create(
index=index,
aliases={COLLECTIONS_INDEX: {}},
)
await client.close()


async def create_item_index(collection_id: str):
Expand All @@ -227,14 +267,25 @@ async def create_item_index(collection_id: str):
None

"""
client = AsyncElasticsearchSettings().create_client
index_name = index_by_collection_id(collection_id)
index = f"{index_by_collection_id(collection_id)}-000001"

await client.options(ignore_status=400).indices.create(
index=f"{index_by_collection_id(collection_id)}-000001",
aliases={index_name: {}},
)
await client.close()
if serverless:
client = SyncElasticsearchSettings().create_client

client.options(ignore_status=400).indices.create(
index=index,
aliases={index_name: {}},
)
client.close()
else:
client = AsyncElasticsearchSettings().create_client

await client.options(ignore_status=400).indices.create(
index=index,
aliases={index_name: {}},
)
await client.close()


async def delete_item_index(collection_id: str):
Expand All @@ -243,17 +294,32 @@ async def delete_item_index(collection_id: str):
Args:
collection_id (str): The ID of the collection whose items index will be deleted.
"""
client = AsyncElasticsearchSettings().create_client

name = index_by_collection_id(collection_id)
resolved = await client.indices.resolve_index(name=name)
if "aliases" in resolved and resolved["aliases"]:
[alias] = resolved["aliases"]
await client.indices.delete_alias(index=alias["indices"], name=alias["name"])
await client.indices.delete(index=alias["indices"])

if serverless:
client = SyncElasticsearchSettings().create_client

resolved = client.indices.resolve_index(name=name)
if "aliases" in resolved and resolved["aliases"]:
[alias] = resolved["aliases"]
client.indices.delete_alias(index=alias["indices"], name=alias["name"])
client.indices.delete(index=alias["indices"])
else:
client.indices.delete(index=name)
client.close()
else:
await client.indices.delete(index=name)
await client.close()
client = AsyncElasticsearchSettings().create_client

resolved = await client.indices.resolve_index(name=name)
if "aliases" in resolved and resolved["aliases"]:
[alias] = resolved["aliases"]
await client.indices.delete_alias(
index=alias["indices"], name=alias["name"]
)
await client.indices.delete(index=alias["indices"])
else:
await client.indices.delete(index=name)
await client.close()


def mk_item_id(item_id: str, collection_id: str):
Expand Down Expand Up @@ -329,20 +395,20 @@ async def get_all_collections(
Returns:
A tuple of (collections, next pagination token if any).
"""
body = {
"sort": [{"id": {"order": "asc"}}],
"size": limit,
}
search_after = None
if token:
search_after = [token]
body["search_after"] = search_after

response = await self.client.search(
index=COLLECTIONS_INDEX,
body={
"sort": [{"id": {"order": "asc"}}],
"size": limit,
"search_after": search_after,
},
)

hits = response["hits"]["hits"]
response = await self.client.search(index=COLLECTIONS_INDEX, body=body)
if serverless:
hits = response.body["hits"]["hits"]
else:
hits = response["hits"]["hits"]
collections = [
self.collection_serializer.db_to_stac(
collection=hit["_source"], request=request, extensions=self.extensions
Expand Down