-
Notifications
You must be signed in to change notification settings - Fork 5
Improve registry-based archive generation reliability and performance #240
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
d83e2f5
e1934d3
0b025ef
f94e082
ca1aecc
89a9e5f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -44,6 +44,8 @@ | |
| from urllib.parse import urlparse | ||
|
|
||
| import requests | ||
| from requests.adapters import HTTPAdapter | ||
| from urllib3.util.retry import Retry | ||
|
|
||
| from . import VERSION | ||
| from .aip import writelabel as writeaiplabel | ||
|
|
@@ -74,6 +76,18 @@ | |
| _defaultserver = "https://pds.nasa.gov/api/search/1/" # Where to find the PDS API | ||
| _searchkey = "ops:Harvest_Info.ops:harvest_date_time" # How to sort products | ||
|
|
||
| # Retry Configuration | ||
| # ------------------- | ||
|
|
||
| _retryattempts = 5 # Maximum number of retry attempts | ||
| _retrybackoff = 2 # Exponential backoff factor (seconds) | ||
| _retrystatus = [ | ||
| HTTPStatus.INTERNAL_SERVER_ERROR, | ||
| HTTPStatus.BAD_GATEWAY, | ||
| HTTPStatus.SERVICE_UNAVAILABLE, | ||
| HTTPStatus.GATEWAY_TIMEOUT, | ||
| ] # HTTP status codes to retry on | ||
|
|
||
|
|
||
| # PDS API property keys we're interested in | ||
| # ----------------------------------------- | ||
|
|
@@ -82,7 +96,9 @@ | |
| _propdatamd5 = "ops:Data_File_Info.ops:md5_checksum" | ||
| _proplabelurl = "ops:Label_File_Info.ops:file_ref" | ||
| _proplabelmd5 = "ops:Label_File_Info.ops:md5_checksum" | ||
| _fields = [_propdataurl, _propdatamd5, _proplabelurl, _proplabelmd5] | ||
| # Fields to request from API to minimize payload size | ||
| # _searchkey must be included because it's accessed in _getproducts() line 244 for pagination | ||
| _fields = [_propdataurl, _propdatamd5, _proplabelurl, _proplabelmd5, _searchkey] | ||
|
|
||
|
|
||
| # Program/Module Metadata | ||
|
|
@@ -117,6 +133,26 @@ def make(cls, url, md5): | |
| return cls(fixmultislashes(url), md5) | ||
|
|
||
|
|
||
| def _get_session_with_retry() -> requests.Session: | ||
| """Create a requests session configured with retry logic and exponential backoff. | ||
|
|
||
| Returns a session that will automatically retry on transient failures (500, 502, 503, 504) | ||
| with exponential backoff to handle API performance issues. | ||
| """ | ||
| session = requests.Session() | ||
| retry_strategy = Retry( | ||
| total=_retryattempts, | ||
| backoff_factor=_retrybackoff, | ||
| status_forcelist=_retrystatus, | ||
| allowed_methods=["GET"], # Only retry GET requests | ||
| raise_on_status=False, # Don't raise on bad status, let us handle it | ||
| ) | ||
| adapter = HTTPAdapter(max_retries=retry_strategy) | ||
| session.mount("http://", adapter) | ||
| session.mount("https://", adapter) | ||
| return session | ||
|
|
||
|
|
||
| def _deurnlidvid(lidvid: str) -> tuple[str, str]: | ||
| """De-URN a LID VID. | ||
|
|
||
|
|
@@ -156,10 +192,21 @@ def _getbundle(server_url: str, lidvid: str) -> Union[dict[str, Any], None]: | |
| identifier ``lidvid`` and return a ``dict`` with its attributes. | ||
| If it can't be found, return ``None``. | ||
| """ | ||
| r = requests.get(f"{server_url}/products/{lidvid}") | ||
| session = _get_session_with_retry() | ||
| url = f"{server_url}/products/{lidvid}" | ||
| _logger.debug('Fetching bundle/product from %s', url) | ||
| r = session.get(url) | ||
| if r.status_code == HTTPStatus.NOT_FOUND: | ||
| return None | ||
| return r.json() | ||
| if not r.ok: | ||
| _logger.error("⚠️ Failed to fetch %s: HTTP %d", url, r.status_code) | ||
| r.raise_for_status() | ||
| try: | ||
| return r.json() | ||
| except requests.exceptions.JSONDecodeError as e: | ||
| _logger.error("⚠️ Failed to parse JSON response from %s: %s", url, e) | ||
| _logger.debug("Response content: %s", r.text[:500]) | ||
| raise ValueError(f"Invalid JSON response from {url}: {e}") from e | ||
|
|
||
|
|
||
| def _getproducts(server_url: str, lidvid: str, allcollections=True) -> Iterator[dict[str, Any]]: | ||
|
|
@@ -170,14 +217,27 @@ def _getproducts(server_url: str, lidvid: str, allcollections=True) -> Iterator[ | |
| If ``allcollections`` is True, then return all collections for LID-only references; otherwise | ||
| return just the latest collection for LID-only references (has no effect on full LIDVID-references). | ||
| """ | ||
| session = _get_session_with_retry() | ||
| # Commenting out `all` vs. `latest` functionality for now since the API does not support it at this time | ||
| # url = f"{server_url}/products/{lidvid}/members/{'all' if allcollections else 'latest'}" | ||
| url = f"{server_url}/products/{lidvid}/members" | ||
| params = {"sort": _searchkey, "limit": _apiquerylimit} | ||
| # Request only the fields we need to minimize payload size | ||
| params = {"sort": _searchkey, "limit": _apiquerylimit, "fields": ",".join(_fields)} | ||
|
||
| while True: | ||
| _logger.debug('Making request to %s with params %r', url, params) | ||
| r = requests.get(url, params=params) # type: ignore | ||
| matches = r.json()["data"] | ||
| r = session.get(url, params=params) # type: ignore | ||
| if not r.ok: | ||
| _logger.error("⚠️ Failed to fetch products from %s: HTTP %d", url, r.status_code) | ||
| r.raise_for_status() | ||
| try: | ||
| data = r.json() | ||
| except requests.exceptions.JSONDecodeError as e: | ||
| _logger.error("⚠️ Failed to parse JSON response from %s: %s", url, e) | ||
| _logger.debug("Response content: %s", r.text[:500]) | ||
| raise ValueError(f"Invalid JSON response from {url}: {e}") from e | ||
jordanpadams marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if "data" not in data: | ||
| _logger.warning('Response missing expected "data" key from %s', url) | ||
| matches = data.get("data", []) | ||
| num_matches = len(matches) | ||
| for i in matches: | ||
| yield i | ||
|
|
@@ -186,6 +246,48 @@ def _getproducts(server_url: str, lidvid: str, allcollections=True) -> Iterator[ | |
| params["search-after"] = matches[-1]["properties"][_searchkey] | ||
|
|
||
|
|
||
| def _getcollections_batch(server_url: str, collection_lidvids: list[str]) -> Iterator[dict[str, Any]]: | ||
| """Get multiple collections efficiently by batching requests. | ||
|
|
||
| Using the PDS API server at ``server_url``, fetch all collections matching the given | ||
| ``collection_lidvids`` by batching them into groups of _apiquerylimit (50) and making | ||
| one API call per batch using OR queries. | ||
|
|
||
| Yields collection dictionaries. Collections not found will be omitted from results. | ||
| """ | ||
| if not collection_lidvids: | ||
| return | ||
|
|
||
| session = _get_session_with_retry() | ||
|
|
||
| # Process collections in batches of _apiquerylimit to avoid query string limits | ||
| for i in range(0, len(collection_lidvids), _apiquerylimit): | ||
| batch = collection_lidvids[i:i + _apiquerylimit] | ||
| # Build query: (lidvid eq "urn:..." or lidvid eq "urn:..." or ...) | ||
| or_conditions = " or ".join([f'lidvid eq "{lidvid}"' for lidvid in batch]) | ||
| query = f"({or_conditions})" | ||
|
|
||
| url = f"{server_url}/products" | ||
| params = {"q": query, "limit": _apiquerylimit} | ||
|
|
||
| _logger.debug('Batch fetching %d collections (batch %d-%d of %d)', | ||
| len(batch), i + 1, i + len(batch), len(collection_lidvids)) | ||
| r = session.get(url, params=params) # type: ignore | ||
| if not r.ok: | ||
| _logger.error("⚠️ Failed to batch fetch collections: HTTP %d", r.status_code) | ||
| r.raise_for_status() | ||
| try: | ||
| data = r.json() | ||
| except requests.exceptions.JSONDecodeError as e: | ||
| _logger.error("⚠️ Failed to parse JSON response from batch query: %s", e) | ||
| _logger.debug("Response content: %s", r.text[:500]) | ||
| raise ValueError(f"Invalid JSON response from batch query: {e}") from e | ||
|
|
||
| matches = data.get("data", []) | ||
| for collection in matches: | ||
| yield collection | ||
|
|
||
|
|
||
| def _addfiles(product: dict, bac: dict): | ||
| """Add the PDS files described in the PDS ``product`` to the ``bac``.""" | ||
| lidvid, props = product["id"], product["properties"] | ||
|
|
@@ -228,10 +330,15 @@ def _comprehendregistry(url: str, bundlelidvid: str, allcollections=True) -> tup | |
| title = bundle.get("title", "«unknown»") | ||
| _addfiles(bundle, bac) | ||
|
|
||
| # It turns out the PDS registry makes this *trivial* compared to the PDS filesystem version; | ||
| # Just understanding it all was there was the hard part! 😊 THANK YOU! 🙏 | ||
| for collection in _getproducts(url, bundlelidvid, allcollections): | ||
| # Get collection LIDVIDs from the bundle's properties instead of using the /members endpoint | ||
| # This avoids issues with the /members endpoint and uses the ref_lidvid_collection field | ||
| collection_lidvids = bundle.get("properties", {}).get("ref_lidvid_collection", []) | ||
| _logger.debug("📦 Found %d collections in bundle properties", len(collection_lidvids)) | ||
|
|
||
| # Batch fetch all collections in groups of _apiquerylimit to minimize API calls | ||
| for collection in _getcollections_batch(url, collection_lidvids): | ||
| _addfiles(collection, bac) | ||
| # Still use /members endpoint for getting products from each collection | ||
| for product in _getproducts(url, collection["id"]): | ||
| _addfiles(product, bac) | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.