diff --git a/src/pds2/aipgen/registry.py b/src/pds2/aipgen/registry.py index a449752..7e88ca8 100644 --- a/src/pds2/aipgen/registry.py +++ b/src/pds2/aipgen/registry.py @@ -44,6 +44,8 @@ from urllib.parse import urlparse import requests +from requests.adapters import HTTPAdapter +from urllib3.util.retry import Retry from . import VERSION from .aip import writelabel as writeaiplabel @@ -74,6 +76,18 @@ _defaultserver = "https://pds.nasa.gov/api/search/1/" # Where to find the PDS API _searchkey = "ops:Harvest_Info.ops:harvest_date_time" # How to sort products +# Retry Configuration +# ------------------- + +_retryattempts = 5 # Maximum number of retry attempts +_retrybackoff = 2 # Exponential backoff factor (seconds) +_retrystatus = [ + HTTPStatus.INTERNAL_SERVER_ERROR, + HTTPStatus.BAD_GATEWAY, + HTTPStatus.SERVICE_UNAVAILABLE, + HTTPStatus.GATEWAY_TIMEOUT, +] # HTTP status codes to retry on + # PDS API property keys we're interested in # ----------------------------------------- @@ -82,7 +96,9 @@ _propdatamd5 = "ops:Data_File_Info.ops:md5_checksum" _proplabelurl = "ops:Label_File_Info.ops:file_ref" _proplabelmd5 = "ops:Label_File_Info.ops:md5_checksum" -_fields = [_propdataurl, _propdatamd5, _proplabelurl, _proplabelmd5] +# Fields to request from API to minimize payload size +# _searchkey must be included because it's accessed in _getproducts() line 244 for pagination +_fields = [_propdataurl, _propdatamd5, _proplabelurl, _proplabelmd5, _searchkey] # Program/Module Metadata @@ -117,6 +133,26 @@ def make(cls, url, md5): return cls(fixmultislashes(url), md5) +def _get_session_with_retry() -> requests.Session: + """Create a requests session configured with retry logic and exponential backoff. + + Returns a session that will automatically retry on transient failures (500, 502, 503, 504) + with exponential backoff to handle API performance issues. + """ + session = requests.Session() + retry_strategy = Retry( + total=_retryattempts, + backoff_factor=_retrybackoff, + status_forcelist=_retrystatus, + allowed_methods=["GET"], # Only retry GET requests + raise_on_status=False, # Don't raise on bad status, let us handle it + ) + adapter = HTTPAdapter(max_retries=retry_strategy) + session.mount("http://", adapter) + session.mount("https://", adapter) + return session + + def _deurnlidvid(lidvid: str) -> tuple[str, str]: """De-URN a LID VID. @@ -156,10 +192,21 @@ def _getbundle(server_url: str, lidvid: str) -> Union[dict[str, Any], None]: identifier ``lidvid`` and return a ``dict`` with its attributes. If it can't be found, return ``None``. """ - r = requests.get(f"{server_url}/products/{lidvid}") + session = _get_session_with_retry() + url = f"{server_url}/products/{lidvid}" + _logger.debug('Fetching bundle/product from %s', url) + r = session.get(url) if r.status_code == HTTPStatus.NOT_FOUND: return None - return r.json() + if not r.ok: + _logger.error("⚠️ Failed to fetch %s: HTTP %d", url, r.status_code) + r.raise_for_status() + try: + return r.json() + except requests.exceptions.JSONDecodeError as e: + _logger.error("⚠️ Failed to parse JSON response from %s: %s", url, e) + _logger.debug("Response content: %s", r.text[:500]) + raise ValueError(f"Invalid JSON response from {url}: {e}") from e def _getproducts(server_url: str, lidvid: str, allcollections=True) -> Iterator[dict[str, Any]]: @@ -170,14 +217,27 @@ def _getproducts(server_url: str, lidvid: str, allcollections=True) -> Iterator[ If ``allcollections`` is True, then return all collections for LID-only references; otherwise return just the latest collection for LID-only references (has no effect on full LIDVID-references). """ + session = _get_session_with_retry() # Commenting out `all` vs. `latest` functionality for now since the API does not support it at this time # url = f"{server_url}/products/{lidvid}/members/{'all' if allcollections else 'latest'}" url = f"{server_url}/products/{lidvid}/members" - params = {"sort": _searchkey, "limit": _apiquerylimit} + # Request only the fields we need to minimize payload size + params = {"sort": _searchkey, "limit": _apiquerylimit, "fields": ",".join(_fields)} while True: _logger.debug('Making request to %s with params %r', url, params) - r = requests.get(url, params=params) # type: ignore - matches = r.json()["data"] + r = session.get(url, params=params) # type: ignore + if not r.ok: + _logger.error("⚠️ Failed to fetch products from %s: HTTP %d", url, r.status_code) + r.raise_for_status() + try: + data = r.json() + except requests.exceptions.JSONDecodeError as e: + _logger.error("⚠️ Failed to parse JSON response from %s: %s", url, e) + _logger.debug("Response content: %s", r.text[:500]) + raise ValueError(f"Invalid JSON response from {url}: {e}") from e + if "data" not in data: + _logger.warning('Response missing expected "data" key from %s', url) + matches = data.get("data", []) num_matches = len(matches) for i in matches: yield i @@ -186,6 +246,48 @@ def _getproducts(server_url: str, lidvid: str, allcollections=True) -> Iterator[ params["search-after"] = matches[-1]["properties"][_searchkey] +def _getcollections_batch(server_url: str, collection_lidvids: list[str]) -> Iterator[dict[str, Any]]: + """Get multiple collections efficiently by batching requests. + + Using the PDS API server at ``server_url``, fetch all collections matching the given + ``collection_lidvids`` by batching them into groups of _apiquerylimit (50) and making + one API call per batch using OR queries. + + Yields collection dictionaries. Collections not found will be omitted from results. + """ + if not collection_lidvids: + return + + session = _get_session_with_retry() + + # Process collections in batches of _apiquerylimit to avoid query string limits + for i in range(0, len(collection_lidvids), _apiquerylimit): + batch = collection_lidvids[i:i + _apiquerylimit] + # Build query: (lidvid eq "urn:..." or lidvid eq "urn:..." or ...) + or_conditions = " or ".join([f'lidvid eq "{lidvid}"' for lidvid in batch]) + query = f"({or_conditions})" + + url = f"{server_url}/products" + params = {"q": query, "limit": _apiquerylimit} + + _logger.debug('Batch fetching %d collections (batch %d-%d of %d)', + len(batch), i + 1, i + len(batch), len(collection_lidvids)) + r = session.get(url, params=params) # type: ignore + if not r.ok: + _logger.error("⚠️ Failed to batch fetch collections: HTTP %d", r.status_code) + r.raise_for_status() + try: + data = r.json() + except requests.exceptions.JSONDecodeError as e: + _logger.error("⚠️ Failed to parse JSON response from batch query: %s", e) + _logger.debug("Response content: %s", r.text[:500]) + raise ValueError(f"Invalid JSON response from batch query: {e}") from e + + matches = data.get("data", []) + for collection in matches: + yield collection + + def _addfiles(product: dict, bac: dict): """Add the PDS files described in the PDS ``product`` to the ``bac``.""" lidvid, props = product["id"], product["properties"] @@ -228,10 +330,15 @@ def _comprehendregistry(url: str, bundlelidvid: str, allcollections=True) -> tup title = bundle.get("title", "«unknown»") _addfiles(bundle, bac) - # It turns out the PDS registry makes this *trivial* compared to the PDS filesystem version; - # Just understanding it all was there was the hard part! 😊 THANK YOU! 🙏 - for collection in _getproducts(url, bundlelidvid, allcollections): + # Get collection LIDVIDs from the bundle's properties instead of using the /members endpoint + # This avoids issues with the /members endpoint and uses the ref_lidvid_collection field + collection_lidvids = bundle.get("properties", {}).get("ref_lidvid_collection", []) + _logger.debug("📦 Found %d collections in bundle properties", len(collection_lidvids)) + + # Batch fetch all collections in groups of _apiquerylimit to minimize API calls + for collection in _getcollections_batch(url, collection_lidvids): _addfiles(collection, bac) + # Still use /members endpoint for getting products from each collection for product in _getproducts(url, collection["id"]): _addfiles(product, bac)