-
Notifications
You must be signed in to change notification settings - Fork 28
Search optimization and indexing based on datetime #405
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Search optimization and indexing based on datetime #405
Conversation
38c85e4
to
11e40c4
Compare
295d3d6
to
243dd1c
Compare
@jonhealy1 The MR is already finished and ready for code review. |
@GrzegorzPustulka There's a couple of conflicts now. They don't look too bad. I have been travelling but am going to try to review this in the next few days, |
@jamesfisher-geo @StijnCaerts @rhysrevans3 Hi. Added you guys as reviewers if you have time to have a look :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks okay to me but I have a couple of questions.
logger.error(f"Invalid interval format: {datetime}, error: {e}") | ||
datetime_search = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this error be returned to the user rather than continuing the search without a datetime filter?
except (ValueError, TypeError) as e: | ||
# Handle invalid interval formats if return_date fails | ||
logger.error( | ||
f"Invalid interval format: {search_request.datetime}, error: {e}" | ||
) | ||
datetime_search = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As above.
def create_index_name(collection_id: str, start_date: str) -> str: | ||
"""Create index name from collection ID and start date. | ||
Args: | ||
collection_id (str): Collection identifier. | ||
start_date (str): Start date for the index. | ||
Returns: | ||
str: Formatted index name. | ||
""" | ||
cleaned = collection_id.translate(_ES_INDEX_NAME_UNSUPPORTED_CHARS_TABLE) | ||
return f"{ITEMS_INDEX_PREFIX}{cleaned.lower()}_{start_date}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this the equivalent of index_by_collection_id
for the simple method? If it is should it not also include the hex of the collection_id
and -000001
?
What's the benefit of having the start datetime in the index name could you just have it in the alias with the end datetime? You could just use a count to prevent index name clashes.
You would then only need to create a new index when you exceed the max size and not for earlier items. If the item's start datetime is earlier or the end datetime is later than the current alias then update the alias.
def __init__(self, cache_ttl_seconds: int = 3600): | ||
"""Initialize the cache manager. | ||
Args: | ||
cache_ttl_seconds (int): Time-to-live for cache entries in seconds. | ||
""" | ||
self._cache: Optional[Dict[str, List[str]]] = None | ||
self._timestamp: float = 0 | ||
self._ttl = cache_ttl_seconds |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be better to just update the cache as aliases are set/updated rather than polling ES every hour?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks great. I've got some comments around error handling and some cache handling as well.
This PR will add a lot of future maintenance burden in it's current form. How about we implement only in async
and not include the sync
code. That would cut down on repetitive code in this PR.
@jonhealy1 @GrzegorzPustulka what are your thoughts on this?
@@ -342,6 +348,7 @@ async def item_collection( | |||
sort=None, | |||
token=token, | |||
collection_ids=[collection_id], | |||
datetime_search=datetime_search, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this needed? We apply the datetime_search
to the search
variable on line 331. If this is optional, could we omit it?
@@ -560,6 +574,7 @@ async def post_search( | |||
token=search_request.token, | |||
sort=sort, | |||
collection_ids=search_request.collections, | |||
datetime_search=datetime_search, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here -- Is this needed? We apply the datetime_search
to the search
variable on line 513. If this is optional, could we omit it?
class ElasticsearchAdapter(SearchEngineAdapter): | ||
"""Elasticsearch-specific adapter implementation.""" | ||
|
||
async def create_simple_index(self, client: Any, collection_id: str) -> str: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The index mappings and setting are missing from ElasticsearchAdapter().create_simple_index()
. Could you include the mappings here like is done in OpenSearchAdapter()._create_index_body()
The patterns for creating an index should be the same between ElasticsearchAdapter()
and OpenSearchAdapter()
IMO. How about creating a _create_index_body()
method in ElasticsearchAdapter()
?
Returns: | ||
SearchEngineType: Detected engine type. | ||
""" | ||
return ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about using isInstance()
here rather than matching the string?
return (
OpenSearchAdapter()
if isInstance(client, (OpenSearch, AsyncOpenSearch))
else ElasticsearchAdapter()
)
"""Factory for creating search engine adapters.""" | ||
|
||
@staticmethod | ||
def create_adapter(engine_type: SearchEngineType) -> SearchEngineAdapter: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this function necessary? See comment below
) | ||
return product_datetime | ||
|
||
async def handle_new_collection( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logging statements in handle_new_collection() and handle_new_collection_sync() would be useful
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I definitely think we need to do a better job at logging on this project.
|
||
_instance = None | ||
|
||
def __new__(cls, client): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit confused with this implementation. Maybe I am missing something. Could this be replaced with the normal method of instance creation using __init__()
def __init__(self, client: Any):
self.cache_manager = IndexCacheManager()
self.alias_loader = AsyncIndexAliasLoader(client, self.cache_manager)
class IndexCacheManager: | ||
"""Manages caching of index aliases with expiration.""" | ||
|
||
def __init__(self, cache_ttl_seconds: int = 3600): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe some concurrency management is needed here because multiple threads may be attempting to access the cache resource at the same time. From what I have found threading.Lock()
should work.
https://docs.python.org/3/library/threading.html#lock-objects
The following (untested) should place a lock on the cache when accessing it and release it when finished
import threading
class IndexCacheManager:
def __init__(self, cache_ttl_seconds: int = 3600):
self._cache: Optional[Dict[str, List[str]]] = None
self._timestamp: float = 0
self._ttl = cache_ttl_seconds
self._lock = threading.Lock()
def get_cache(self) -> Optional[Dict[str, List[str]]]:
"""Get the current cache if not expired.
Returns:
Optional[Dict[str, List[str]]]: Cache data if valid, None if expired.
"""
with self._lock:
if self.is_expired:
return None
return self._cache
""" | ||
if self.is_expired: | ||
return None | ||
return self._cache |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Returning the _cache
object here could be problematic because it is a pointer to the actual cache. How about returning a copy?
return {k: v.copy() for k, v in self._cache.items()}
return ( | ||
SyncDatetimeBasedIndexSelector(sync_client) | ||
if use_datetime_filtering | ||
else UnfilteredIndexSelector() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But the UnfilteredIndexSelector()
is async
I'll improve all the comments in the coming days, remove the sync versions, and fix the bugs my friend found testing this MR |
@@ -998,6 +1005,9 @@ async def _search_and_get_ids( | |||
async def test_search_datetime_with_null_datetime( | |||
app_client, txn_client, load_test_data | |||
): | |||
if not os.getenv("ENABLE_DATETIME_INDEX_FILTERING"): | |||
pytest.skip() | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this right? This test should definitely run in default mode.
@GrzegorzPustulka Can we set ENABLE_DATETIME_INDEX_FILTERING for the associated tests and then turn it off for the default tests? |
| `DATABASE_REFRESH` | Controls whether database operations refresh the index immediately after changes. If set to `true`, changes will be immediately searchable. If set to `false`, changes may not be immediately visible but can improve performance for bulk operations. If set to `wait_for`, changes will wait for the next refresh cycle to become visible. | `false` | Optional | | ||
| `ENABLE_TRANSACTIONS_EXTENSIONS` | Enables or disables the Transactions and Bulk Transactions API extensions. If set to `false`, the POST `/collections` route and related transaction endpoints (including bulk transaction operations) will be unavailable in the API. This is useful for deployments where mutating the catalog via the API should be prevented. | `true` | Optional | | ||
| `ENABLE_DATETIME_INDEX_FILTERING` | Enable datetime-based index selection using collection IDs. Requires indexes in format: STAC_ITEMS_INDEX_PREFIX_collection-id_start_year-start_month-start_day-end_year-end_month-end_day, e.g. items_sentinel-2-l2a_2025-06-06-2025-09-22. | `false` | Optional | | ||
| `DATETIME_INDEX_MAX_SIZE_GB` | Maximum size limit in GB for datetime-based indexes. When an index exceeds this size, a new time-partitioned index will be created. Note: This value should account for ~25% overhead due to OS/ES caching of data structures and metadata. Only applies when`ENABLE_DATETIME_INDEX_FILTERING` is enabled. | `25` | Optional | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are important additions and maybe should have their own section in the readme for a better explanation.
Related Issue(s):
Index Management System with Time-based Partitioning
Description
This PR introduces a new index management system that enables automatic index partitioning based on dates and index size control with automatic splitting.
How it works
System Architecture
The system consists of several main components:
1. Search Engine Adapters
SearchEngineAdapter
- base classElasticsearchAdapter
andOpenSearchAdapter
- implementations for specific engines2. Index Selection Strategies
AsyncDatetimeBasedIndexSelector
/SyncDatetimeBasedIndexSelector
- date-based index filteringUnfilteredIndexSelector
- returns all indexes (fallback)3. Data Insertion Strategies
Datetime Strategy - Operation Details
Index Format:
Item Insertion Process:
properties.datetime
)DATETIME_INDEX_MAX_SIZE_GB
) - splits indexEarly Date Handling:
If item has date earlier than oldest index:
Index Splitting:
When index exceeds size limit:
Cache and Performance
IndexCacheManager:
AsyncIndexAliasLoader / SyncIndexAliasLoader:
Configuration
New Environment Variables:
Usage Examples
Scenario 1: Adding items to new collection
2025-01-15
→ creates indexitems_collection_2025-01-15
Scenario 2: Size limit exceeded
items_collection_2025-01-01
reaches 25GB2025-03-15
→ system splits index:items_collection_2025-01-01-2025-03-15
items_collection_2025-03-16
Scenario 3: Item with early date
items_collection_2025-02-01
2024-12-15
→ creates:items_collection_2024-12-15-2025-01-31
Search
System automatically filters indexes during search:
Query with date range:
Searches only indexes containing items from this period, instead of all collection indexes.
Factories
IndexSelectorFactory:
create_async_selector()
/create_sync_selector()
IndexInsertionFactory:
SearchEngineAdapterFactory:
Backward Compatibility
ENABLE_DATETIME_INDEX_FILTERING=false
→ works as beforeAll operations have sync and async versions for different usage contexts in the application.
PR Checklist:
pre-commit run --all-files
)make test
)