diff --git a/src/apify/storage_clients/_apify/_alias_resolving.py b/src/apify/storage_clients/_apify/_alias_resolving.py new file mode 100644 index 00000000..e357333f --- /dev/null +++ b/src/apify/storage_clients/_apify/_alias_resolving.py @@ -0,0 +1,264 @@ +from __future__ import annotations + +import logging +from asyncio import Lock +from logging import getLogger +from typing import TYPE_CHECKING, ClassVar, Literal, overload + +from apify_client import ApifyClientAsync + +from ._utils import hash_api_base_url_and_token +from apify._configuration import Configuration + +if TYPE_CHECKING: + from collections.abc import Callable + from types import TracebackType + + from apify_client.clients import ( + DatasetClientAsync, + DatasetCollectionClientAsync, + KeyValueStoreClientAsync, + KeyValueStoreCollectionClientAsync, + RequestQueueClientAsync, + RequestQueueCollectionClientAsync, + ) + +logger = getLogger(__name__) + + +@overload +async def open_by_alias( + *, + alias: str, + storage_type: Literal['Dataset'], + collection_client: DatasetCollectionClientAsync, + get_resource_client_by_id: Callable[[str], DatasetClientAsync], + configuration: Configuration, +) -> DatasetClientAsync: ... + + +@overload +async def open_by_alias( + *, + alias: str, + storage_type: Literal['KeyValueStore'], + collection_client: KeyValueStoreCollectionClientAsync, + get_resource_client_by_id: Callable[[str], KeyValueStoreClientAsync], + configuration: Configuration, +) -> KeyValueStoreClientAsync: ... + + +@overload +async def open_by_alias( + *, + alias: str, + storage_type: Literal['RequestQueue'], + collection_client: RequestQueueCollectionClientAsync, + get_resource_client_by_id: Callable[[str], RequestQueueClientAsync], + configuration: Configuration, +) -> RequestQueueClientAsync: ... + + +async def open_by_alias( + *, + alias: str, + storage_type: Literal['Dataset', 'KeyValueStore', 'RequestQueue'], + collection_client: ( + KeyValueStoreCollectionClientAsync | RequestQueueCollectionClientAsync | DatasetCollectionClientAsync + ), + get_resource_client_by_id: Callable[[str], KeyValueStoreClientAsync | RequestQueueClientAsync | DatasetClientAsync], + configuration: Configuration, +) -> KeyValueStoreClientAsync | RequestQueueClientAsync | DatasetClientAsync: + """Open storage by alias, creating it if necessary. + + This function resolves storage aliases to their IDs, creating new unnamed storage if needed. + The alias mapping is stored in the default key-value store for persistence across Actor runs. + + Args: + alias: The alias name for the storage (e.g., '__default__', 'my-storage'). + storage_type: The type of storage to open. + collection_client: The Apify API collection client for the storage type. + get_resource_client_by_id: A callable that takes a storage ID and returns the resource client. + configuration: Configuration object containing API credentials and settings. + + Returns: + The storage client for the opened or created storage. + + Raises: + ValueError: If storage ID cannot be determined from API response. + TypeError: If API response format is unexpected. + """ + async with AliasResolver( + storage_type=storage_type, + alias=alias, + configuration=configuration, + ) as alias_resolver: + storage_id = await alias_resolver.resolve_id() + + if storage_id: + # Check if storage with this ID exists + resource_client = get_resource_client_by_id(storage_id) + raw_metadata = await resource_client.get() + if raw_metadata: + return resource_client + + # Create new unnamed storage and store alias mapping + raw_metadata = await collection_client.get_or_create() + + await alias_resolver.store_mapping(storage_id=raw_metadata['id']) + return get_resource_client_by_id(raw_metadata['id']) + + +class AliasResolver: + """Class for handling aliases. + + The purpose of this is class is to ensure that alias storages are created with correct id. This is achieved by using + default kvs as a storage for global mapping of aliases to storage ids. Same mapping is also kept in memory to avoid + unnecessary calls to API and also have limited support of alias storages when not running on Apify platform. When on + Apify platform, the storages created with alias are accessible by the same alias even after migration or reboot. + """ + + _ALIAS_MAPPING_KEY = '__STORAGE_ALIASES_MAPPING' + """Key used for storing the alias mapping in the default kvs.""" + + _ALIAS_STORAGE_KEY_SEPARATOR = ',' + """Separator used in the storage key for storing the alias mapping.""" + + _alias_map: ClassVar[dict[str, str]] = {} + """Map containing pre-existing alias storages and their ids. Global for all instances.""" + + _alias_init_lock: Lock | None = None + """Lock for creating alias storages. Only one alias storage can be created at the time. Global for all instances.""" + + def __init__( + self, + storage_type: Literal['Dataset', 'KeyValueStore', 'RequestQueue'], + alias: str, + configuration: Configuration, + ) -> None: + self._storage_type = storage_type + self._alias = alias + self._configuration = configuration + self._additional_cache_key = hash_api_base_url_and_token(configuration) + + async def __aenter__(self) -> AliasResolver: + """Context manager to prevent race condition in alias creation.""" + lock = await self._get_alias_init_lock() + await lock.acquire() + return self + + async def __aexit__( + self, + exc_type: type[BaseException] | None, + exc_value: BaseException | None, + exc_traceback: TracebackType | None, + ) -> None: + lock = await self._get_alias_init_lock() + lock.release() + + @classmethod + async def _get_alias_init_lock(cls) -> Lock: + """Get lock for controlling the creation of the alias storages. + + The lock is shared for all instances of the AliasResolver class. + It is created in async method to ensure that some event loop is already running. + """ + if cls._alias_init_lock is None: + cls._alias_init_lock = Lock() + return cls._alias_init_lock + + @classmethod + async def _get_alias_map(cls, configuration: Configuration) -> dict[str, str]: + """Get the aliases and storage ids mapping from the default kvs. + + Mapping is loaded from kvs only once and is shared for all instances of the _AliasResolver class. + + Args: + configuration: Configuration object to use for accessing the default KVS. + + Returns: + Map of aliases and storage ids. + """ + if not cls._alias_map and Configuration.get_global_configuration().is_at_home: + default_kvs_client = await cls._get_default_kvs_client(configuration) + + record = await default_kvs_client.get_record(cls._ALIAS_MAPPING_KEY) + + # get_record can return {key: ..., value: ..., content_type: ...} + if isinstance(record, dict): + if 'value' in record and isinstance(record['value'], dict): + cls._alias_map = record['value'] + else: + cls._alias_map = record + else: + cls._alias_map = dict[str, str]() + + return cls._alias_map + + async def resolve_id(self) -> str | None: + """Get id of the aliased storage. + + Returns: + Storage id if it exists, None otherwise. + """ + return (await self._get_alias_map(self._configuration)).get(self._storage_key, None) + + async def store_mapping(self, storage_id: str) -> None: + """Add alias and related storage id to the mapping in default kvs and local in-memory mapping.""" + # Update in-memory mapping + alias_map = await self._get_alias_map(self._configuration) + alias_map[self._storage_key] = storage_id + + if not Configuration.get_global_configuration().is_at_home: + logging.getLogger(__name__).debug( + '_AliasResolver storage limited retention is only supported on Apify platform. Storage is not exported.' + ) + return + + default_kvs_client = await self._get_default_kvs_client(self._configuration) + await default_kvs_client.get() + + try: + record = await default_kvs_client.get_record(self._ALIAS_MAPPING_KEY) + + # get_record can return {key: ..., value: ..., content_type: ...} + if isinstance(record, dict) and 'value' in record: + record = record['value'] + + # Update or create the record with the new alias mapping + if isinstance(record, dict): + record[self._storage_key] = storage_id + else: + record = {self._storage_key: storage_id} + + # Store the mapping back in the KVS. + await default_kvs_client.set_record(self._ALIAS_MAPPING_KEY, record) + except Exception as exc: + logger.warning(f'Error storing alias mapping for {self._alias}: {exc}') + + @property + def _storage_key(self) -> str: + """Get a unique storage key used for storing the alias in the mapping.""" + return self._ALIAS_STORAGE_KEY_SEPARATOR.join( + [ + self._storage_type, + self._alias, + self._additional_cache_key, + ] + ) + + @staticmethod + async def _get_default_kvs_client(configuration: Configuration) -> KeyValueStoreClientAsync: + """Get a client for the default key-value store.""" + apify_client_async = ApifyClientAsync( + token=configuration.token, + api_url=configuration.api_base_url, + max_retries=8, + min_delay_between_retries_millis=500, + timeout_secs=360, + ) + + if not configuration.default_key_value_store_id: + raise ValueError("'Configuration.default_key_value_store_id' must be set.") + + return apify_client_async.key_value_store(key_value_store_id=configuration.default_key_value_store_id) diff --git a/src/apify/storage_clients/_apify/_api_client_creation.py b/src/apify/storage_clients/_apify/_api_client_creation.py new file mode 100644 index 00000000..05869580 --- /dev/null +++ b/src/apify/storage_clients/_apify/_api_client_creation.py @@ -0,0 +1,182 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Literal, overload + +from apify_client import ApifyClientAsync +from crawlee._utils.crypto import crypto_random_object_id + +from apify.storage_clients._apify._alias_resolving import open_by_alias + +if TYPE_CHECKING: + from apify_client.clients import DatasetClientAsync, KeyValueStoreClientAsync, RequestQueueClientAsync + + from apify._configuration import Configuration + + +@overload +async def create_storage_api_client( + *, + storage_type: Literal['Dataset'], + configuration: Configuration, + id: str | None = None, + name: str | None = None, + alias: str | None = None, +) -> DatasetClientAsync: ... + + +@overload +async def create_storage_api_client( + *, + storage_type: Literal['KeyValueStore'], + configuration: Configuration, + id: str | None = None, + name: str | None = None, + alias: str | None = None, +) -> KeyValueStoreClientAsync: ... + + +@overload +async def create_storage_api_client( + *, + storage_type: Literal['RequestQueue'], + configuration: Configuration, + id: str | None = None, + name: str | None = None, + alias: str | None = None, +) -> RequestQueueClientAsync: ... + + +async def create_storage_api_client( + *, + storage_type: Literal['Dataset', 'KeyValueStore', 'RequestQueue'], + configuration: Configuration, + id: str | None = None, + name: str | None = None, + alias: str | None = None, +) -> KeyValueStoreClientAsync | RequestQueueClientAsync | DatasetClientAsync: + """Get an Apify API client for a specific storage type. + + This function handles the creation and initialization of Apify storage clients (KVS, RQ, or Dataset). + It manages authentication, storage lookup/creation, and client instantiation. + + Args: + storage_type: The type of storage to open. + configuration: Configuration object containing API credentials. + id: Storage ID to open. Mutually exclusive with name and alias. + name: Storage name (global scope, persists across runs). Mutually exclusive with id and alias. + alias: Storage alias (run scope, creates unnamed storage). Mutually exclusive with id and name. + + Returns: + The storage client for the opened or created storage. + + Raises: + ValueError: If configuration is invalid, multiple identifiers are provided, or storage cannot be found. + """ + if sum(1 for param in [id, name, alias] if param is not None) > 1: + raise ValueError('Only one of "id", "name", or "alias" can be specified, not multiple.') + + apify_client = _create_api_client(configuration) + + # Get storage-specific configuration + if storage_type == 'KeyValueStore': + collection_client = apify_client.key_value_stores() + default_id = configuration.default_key_value_store_id + + def get_resource_client(storage_id: str) -> KeyValueStoreClientAsync: + return apify_client.key_value_store(key_value_store_id=storage_id) + + elif storage_type == 'RequestQueue': + collection_client = apify_client.request_queues() # type: ignore[assignment] + default_id = configuration.default_request_queue_id + + def get_resource_client(storage_id: str) -> RequestQueueClientAsync: # type: ignore[misc] + # Use suitable client_key to make `hadMultipleClients` response of Apify API useful. + # It should persist across migrated or resurrected Actor runs on the Apify platform. + _api_max_client_key_length = 32 + client_key = (configuration.actor_run_id or crypto_random_object_id(length=_api_max_client_key_length))[ + :_api_max_client_key_length + ] + return apify_client.request_queue(request_queue_id=storage_id, client_key=client_key) + + elif storage_type == 'Dataset': + collection_client = apify_client.datasets() # type: ignore[assignment] + default_id = configuration.default_dataset_id + + def get_resource_client(storage_id: str) -> DatasetClientAsync: # type: ignore[misc] + return apify_client.dataset(dataset_id=storage_id) + + else: + raise ValueError(f'Unknown storage type: {storage_type}') + + # Handle different opening scenarios + match (alias, name, id, default_id): + # Normalize unnamed default storage to unnamed storage aliased as `__default__`. + case (None, None, None, None): + return await open_by_alias( + alias='__default__', + storage_type=storage_type, # type: ignore[arg-type] + collection_client=collection_client, + get_resource_client_by_id=get_resource_client, + configuration=configuration, + ) + + # Open by alias. + case (str(), None, None, _): + return await open_by_alias( + alias=alias, + storage_type=storage_type, # type: ignore[arg-type] + collection_client=collection_client, + get_resource_client_by_id=get_resource_client, + configuration=configuration, + ) + + # Open default storage. + case (None, None, None, str()): + resource_client = get_resource_client(default_id) + raw_metadata = await resource_client.get() + # Default storage does not exist. Create a new one. + if not raw_metadata: + raw_metadata = await collection_client.get_or_create() + resource_client = get_resource_client(raw_metadata['id']) + return resource_client + + # Open by name. + case (None, str(), None, _): + raw_metadata = await collection_client.get_or_create(name=name) + return get_resource_client(raw_metadata['id']) + + # Open by ID. + case (None, None, str(), _): + resource_client = get_resource_client(id) + raw_metadata = await resource_client.get() + if raw_metadata is None: + raise ValueError(f'Opening {storage_type} with id={id} failed.') + return resource_client + + raise RuntimeError('Unreachable code') + + +def _create_api_client(configuration: Configuration) -> ApifyClientAsync: + """Create and validate an ApifyClientAsync from the given Configuration.""" + if not configuration.token: + raise ValueError(f'Apify storage client requires a valid token in Configuration (token={configuration.token}).') + + if not configuration.api_base_url: + raise ValueError( + f'Apify storage client requires a valid API URL in Configuration (api_url={configuration.api_base_url}).' + ) + + if not configuration.api_public_base_url: + raise ValueError( + 'Apify storage client requires a valid API public base URL in Configuration ' + f'(api_public_base_url={configuration.api_public_base_url}).' + ) + + return ApifyClientAsync( + token=configuration.token, + api_url=configuration.api_base_url, + api_public_url=configuration.api_public_base_url, + max_retries=8, + min_delay_between_retries_millis=500, + timeout_secs=360, + ) diff --git a/src/apify/storage_clients/_apify/_dataset_client.py b/src/apify/storage_clients/_apify/_dataset_client.py index 6a30bfbc..a918bddd 100644 --- a/src/apify/storage_clients/_apify/_dataset_client.py +++ b/src/apify/storage_clients/_apify/_dataset_client.py @@ -11,9 +11,8 @@ from crawlee._utils.file import json_dumps from crawlee.storage_clients._base import DatasetClient from crawlee.storage_clients.models import DatasetItemsListPage, DatasetMetadata -from crawlee.storages import Dataset -from ._utils import AliasResolver, create_apify_client +from ._api_client_creation import create_storage_api_client if TYPE_CHECKING: from collections.abc import AsyncIterator @@ -101,66 +100,16 @@ async def open( `id`, `name`, or `alias` is provided, or if none are provided and no default storage ID is available in the configuration. """ - if sum(1 for param in [id, name, alias] if param is not None) > 1: - raise ValueError('Only one of "id", "name", or "alias" can be specified, not multiple.') - - apify_client_async = create_apify_client(configuration) - apify_datasets_client = apify_client_async.datasets() - - # Normalize unnamed default storage in cases where not defined in `configuration.default_dataset_id` to unnamed - # storage aliased as `__default__` - if not any([alias, name, id, configuration.default_dataset_id]): - alias = '__default__' - - if alias: - # Check if there is pre-existing alias mapping in the default KVS. - async with AliasResolver(storage_type=Dataset, alias=alias, configuration=configuration) as _alias: - id = await _alias.resolve_id() - - # There was no pre-existing alias in the mapping. - # Create a new unnamed storage and store the mapping. - if id is None: - new_storage_metadata = DatasetMetadata.model_validate( - await apify_datasets_client.get_or_create(), - ) - id = new_storage_metadata.id - await _alias.store_mapping(storage_id=id) - - # If name is provided, get or create the storage by name. - elif name: - id = DatasetMetadata.model_validate( - await apify_datasets_client.get_or_create(name=name), - ).id - - # If none are provided, try to get the default storage ID from environment variables. - elif id is None: - id = configuration.default_dataset_id - if not id: - raise ValueError( - 'Dataset "id", "name", or "alias" must be specified, ' - 'or a default dataset ID must be set in the configuration.' - ) - - # Now create the client for the determined ID - apify_dataset_client = apify_client_async.dataset(dataset_id=id) - - # Fetch its metadata. - metadata = await apify_dataset_client.get() - - # If metadata is None, it means the storage does not exist, so we create it. - if metadata is None: - id = DatasetMetadata.model_validate( - await apify_datasets_client.get_or_create(), - ).id - apify_dataset_client = apify_client_async.dataset(dataset_id=id) - - # Verify that the storage exists by fetching its metadata again. - metadata = await apify_dataset_client.get() - if metadata is None: - raise ValueError(f'Opening dataset with id={id}, name={name}, and alias={alias} failed.') + api_client = await create_storage_api_client( + storage_type='Dataset', + configuration=configuration, + alias=alias, + name=name, + id=id, + ) return cls( - api_client=apify_dataset_client, + api_client=api_client, api_public_base_url='', # Remove in version 4.0, https://github.com/apify/apify-sdk-python/issues/635 lock=asyncio.Lock(), ) diff --git a/src/apify/storage_clients/_apify/_key_value_store_client.py b/src/apify/storage_clients/_apify/_key_value_store_client.py index 8d698a80..b422b464 100644 --- a/src/apify/storage_clients/_apify/_key_value_store_client.py +++ b/src/apify/storage_clients/_apify/_key_value_store_client.py @@ -9,10 +9,9 @@ from crawlee.storage_clients._base import KeyValueStoreClient from crawlee.storage_clients.models import KeyValueStoreRecord, KeyValueStoreRecordMetadata -from crawlee.storages import KeyValueStore +from ._api_client_creation import create_storage_api_client from ._models import ApifyKeyValueStoreMetadata, KeyValueStoreListKeysPage -from ._utils import AliasResolver, create_apify_client if TYPE_CHECKING: from collections.abc import AsyncIterator @@ -90,67 +89,15 @@ async def open( `id`, `name`, or `alias` is provided, or if none are provided and no default storage ID is available in the configuration. """ - if sum(1 for param in [id, name, alias] if param is not None) > 1: - raise ValueError('Only one of "id", "name", or "alias" can be specified, not multiple.') - - apify_client_async = create_apify_client(configuration) - apify_kvss_client = apify_client_async.key_value_stores() - - # Normalize unnamed default storage in cases where not defined in `configuration.default_key_value_store_id` to - # unnamed storage aliased as `__default__` - if not any([alias, name, id, configuration.default_key_value_store_id]): - alias = '__default__' - - if alias: - # Check if there is pre-existing alias mapping in the default KVS. - async with AliasResolver(storage_type=KeyValueStore, alias=alias, configuration=configuration) as _alias: - id = await _alias.resolve_id() - - # There was no pre-existing alias in the mapping. - # Create a new unnamed storage and store the mapping. - if id is None: - # Create a new storage and store the alias mapping - new_storage_metadata = ApifyKeyValueStoreMetadata.model_validate( - await apify_kvss_client.get_or_create(), - ) - id = new_storage_metadata.id - await _alias.store_mapping(storage_id=id) - - # If name is provided, get or create the storage by name. - elif name: - id = ApifyKeyValueStoreMetadata.model_validate( - await apify_kvss_client.get_or_create(name=name), - ).id - - # If none are provided, try to get the default storage ID from environment variables. - elif id is None: - id = configuration.default_key_value_store_id - if not id: - raise ValueError( - 'KeyValueStore "id", "name", or "alias" must be specified, ' - 'or a default KeyValueStore ID must be set in the configuration.' - ) - - # Now create the client for the determined ID - apify_kvs_client = apify_client_async.key_value_store(key_value_store_id=id) - - # Fetch its metadata. - metadata = await apify_kvs_client.get() - - # If metadata is None, it means the storage does not exist, so we create it. - if metadata is None: - id = ApifyKeyValueStoreMetadata.model_validate( - await apify_kvss_client.get_or_create(), - ).id - apify_kvs_client = apify_client_async.key_value_store(key_value_store_id=id) - - # Verify that the storage exists by fetching its metadata again. - metadata = await apify_kvs_client.get() - if metadata is None: - raise ValueError(f'Opening key-value store with id={id}, name={name}, and alias={alias} failed.') - + api_client = await create_storage_api_client( + storage_type='KeyValueStore', + configuration=configuration, + alias=alias, + name=name, + id=id, + ) return cls( - api_client=apify_kvs_client, + api_client=api_client, api_public_base_url='', # Remove in version 4.0, https://github.com/apify/apify-sdk-python/issues/635 lock=asyncio.Lock(), ) @@ -168,12 +115,12 @@ async def drop(self) -> None: await self._api_client.delete() @override - async def get_value(self, key: str) -> KeyValueStoreRecord | None: + async def get_value(self, *, key: str) -> KeyValueStoreRecord | None: response = await self._api_client.get_record(key) return KeyValueStoreRecord.model_validate(response) if response else None @override - async def set_value(self, key: str, value: Any, content_type: str | None = None) -> None: + async def set_value(self, *, key: str, value: Any, content_type: str | None = None) -> None: async with self._lock: await self._api_client.set_record( key=key, @@ -182,7 +129,7 @@ async def set_value(self, key: str, value: Any, content_type: str | None = None) ) @override - async def delete_value(self, key: str) -> None: + async def delete_value(self, *, key: str) -> None: async with self._lock: await self._api_client.delete_record(key=key) @@ -220,10 +167,11 @@ async def iterate_keys( exclusive_start_key = list_key_page.next_exclusive_start_key @override - async def record_exists(self, key: str) -> bool: + async def record_exists(self, *, key: str) -> bool: return await self._api_client.record_exists(key=key) - async def get_public_url(self, key: str) -> str: + @override + async def get_public_url(self, *, key: str) -> str: """Get a URL for the given key that may be used to publicly access the value in the remote key-value store. Args: diff --git a/src/apify/storage_clients/_apify/_request_queue_client.py b/src/apify/storage_clients/_apify/_request_queue_client.py index 74c48cde..be6bc5c9 100644 --- a/src/apify/storage_clients/_apify/_request_queue_client.py +++ b/src/apify/storage_clients/_apify/_request_queue_client.py @@ -5,21 +5,19 @@ from typing_extensions import override -from crawlee._utils.crypto import crypto_random_object_id from crawlee.storage_clients._base import RequestQueueClient -from crawlee.storage_clients.models import AddRequestsResponse, ProcessedRequest, RequestQueueMetadata -from crawlee.storages import RequestQueue +from ._api_client_creation import create_storage_api_client from ._models import ApifyRequestQueueMetadata, RequestQueueStats from ._request_queue_shared_client import ApifyRequestQueueSharedClient from ._request_queue_single_client import ApifyRequestQueueSingleClient -from ._utils import AliasResolver, create_apify_client if TYPE_CHECKING: from collections.abc import Sequence from apify_client.clients import RequestQueueClientAsync from crawlee import Request + from crawlee.storage_clients.models import AddRequestsResponse, ProcessedRequest, RequestQueueMetadata from apify import Configuration @@ -224,73 +222,25 @@ async def open( `id`, `name`, or `alias` is provided, or if none are provided and no default storage ID is available in the configuration. """ - if sum(1 for param in [id, name, alias] if param is not None) > 1: - raise ValueError('Only one of "id", "name", or "alias" can be specified, not multiple.') - - apify_client_async = create_apify_client(configuration) - apify_rqs_client = apify_client_async.request_queues() - - # Normalize unnamed default storage in cases where not defined in `configuration.default_request_queue_id` to - # unnamed storage aliased as `__default__` - if not any([alias, name, id, configuration.default_request_queue_id]): - alias = '__default__' - - if alias: - # Check if there is pre-existing alias mapping in the default KVS. - async with AliasResolver(storage_type=RequestQueue, alias=alias, configuration=configuration) as _alias: - id = await _alias.resolve_id() - - # There was no pre-existing alias in the mapping. - # Create a new unnamed storage and store the mapping. - if id is None: - new_storage_metadata = RequestQueueMetadata.model_validate( - await apify_rqs_client.get_or_create(), - ) - id = new_storage_metadata.id - await _alias.store_mapping(storage_id=id) - - # If name is provided, get or create the storage by name. - elif name: - id = RequestQueueMetadata.model_validate( - await apify_rqs_client.get_or_create(name=name), - ).id - - # If none are provided, try to get the default storage ID from environment variables. - elif id is None: - id = configuration.default_request_queue_id - if not id: - raise ValueError( - 'RequestQueue "id", "name", or "alias" must be specified, ' - 'or a default default_request_queue_id ID must be set in the configuration.' - ) - - # Use suitable client_key to make `hadMultipleClients` response of Apify API useful. - # It should persist across migrated or resurrected Actor runs on the Apify platform. - _api_max_client_key_length = 32 - client_key = (configuration.actor_run_id or crypto_random_object_id(length=_api_max_client_key_length))[ - :_api_max_client_key_length - ] - - apify_rq_client = apify_client_async.request_queue(request_queue_id=id, client_key=client_key) - - # Fetch its metadata. - metadata = await apify_rq_client.get() - - # If metadata is None, it means the storage does not exist, so we create it. - if metadata is None: - id = RequestQueueMetadata.model_validate( - await apify_rqs_client.get_or_create(), - ).id - apify_rq_client = apify_client_async.request_queue(request_queue_id=id, client_key=client_key) - - # Verify that the storage exists by fetching its metadata again. - metadata = await apify_rq_client.get() - if metadata is None: - raise ValueError(f'Opening request queue with id={id}, name={name}, and alias={alias} failed.') - - metadata_model = RequestQueueMetadata.model_validate(metadata) - - return cls(api_client=apify_rq_client, metadata=metadata_model, access=access) + api_client = await create_storage_api_client( + storage_type='RequestQueue', + configuration=configuration, + alias=alias, + name=name, + id=id, + ) + + # Fetch metadata separately + raw_metadata = await api_client.get() + if raw_metadata is None: + raise ValueError('Failed to retrieve request queue metadata') + metadata = ApifyRequestQueueMetadata.model_validate(raw_metadata) + + return cls( + api_client=api_client, + metadata=metadata, + access=access, + ) @override async def purge(self) -> None: diff --git a/src/apify/storage_clients/_apify/_request_queue_single_client.py b/src/apify/storage_clients/_apify/_request_queue_single_client.py index 61c47acf..04acac1b 100644 --- a/src/apify/storage_clients/_apify/_request_queue_single_client.py +++ b/src/apify/storage_clients/_apify/_request_queue_single_client.py @@ -9,8 +9,8 @@ from crawlee.storage_clients.models import AddRequestsResponse, ProcessedRequest, RequestQueueMetadata +from ._utils import unique_key_to_request_id from apify import Request -from apify.storage_clients._apify._utils import unique_key_to_request_id if TYPE_CHECKING: from collections.abc import Sequence diff --git a/src/apify/storage_clients/_apify/_utils.py b/src/apify/storage_clients/_apify/_utils.py index 0648d3f5..5d8174e8 100644 --- a/src/apify/storage_clients/_apify/_utils.py +++ b/src/apify/storage_clients/_apify/_utils.py @@ -1,174 +1,14 @@ from __future__ import annotations -import logging import re -from asyncio import Lock from base64 import b64encode from hashlib import sha256 -from logging import getLogger -from typing import TYPE_CHECKING, ClassVar +from typing import TYPE_CHECKING -from apify_client import ApifyClientAsync from crawlee._utils.crypto import compute_short_hash -from apify._configuration import Configuration - if TYPE_CHECKING: - from types import TracebackType - - from apify_client.clients import KeyValueStoreClientAsync - from crawlee.storages import Dataset, KeyValueStore, RequestQueue - - -logger = getLogger(__name__) - - -class AliasResolver: - """Class for handling aliases. - - The purpose of this is class is to ensure that alias storages are created with correct id. This is achieved by using - default kvs as a storage for global mapping of aliases to storage ids. Same mapping is also kept in memory to avoid - unnecessary calls to API and also have limited support of alias storages when not running on Apify platform. When on - Apify platform, the storages created with alias are accessible by the same alias even after migration or reboot. - """ - - _alias_map: ClassVar[dict[str, str]] = {} - """Map containing pre-existing alias storages and their ids. Global for all instances.""" - _alias_init_lock: Lock | None = None - """Lock for creating alias storages. Only one alias storage can be created at the time. Global for all instances.""" - - _ALIAS_STORAGE_KEY_SEPARATOR = ',' - _ALIAS_MAPPING_KEY = '__STORAGE_ALIASES_MAPPING' - - def __init__( - self, storage_type: type[Dataset | KeyValueStore | RequestQueue], alias: str, configuration: Configuration - ) -> None: - self._storage_type = storage_type - self._alias = alias - self._additional_cache_key = hash_api_base_url_and_token(configuration) - - async def __aenter__(self) -> AliasResolver: - """Context manager to prevent race condition in alias creation.""" - lock = await self._get_alias_init_lock() - await lock.acquire() - return self - - async def __aexit__( - self, exc_type: type[BaseException] | None, exc_value: BaseException | None, exc_traceback: TracebackType | None - ) -> None: - lock = await self._get_alias_init_lock() - lock.release() - - @classmethod - async def _get_alias_init_lock(cls) -> Lock: - """Get lock for controlling the creation of the alias storages. - - The lock is shared for all instances of the AliasResolver class. - It is created in async method to ensure that some event loop is already running. - """ - if cls._alias_init_lock is None: - cls._alias_init_lock = Lock() - return cls._alias_init_lock - - @classmethod - async def _get_alias_map(cls) -> dict[str, str]: - """Get the aliases and storage ids mapping from the default kvs. - - Mapping is loaded from kvs only once and is shared for all instances of the AliasResolver class. - - Returns: - Map of aliases and storage ids. - """ - if not cls._alias_map and Configuration.get_global_configuration().is_at_home: - default_kvs_client = await _get_default_kvs_client() - - record = await default_kvs_client.get_record(cls._ALIAS_MAPPING_KEY) - - # get_record can return {key: ..., value: ..., content_type: ...} - if isinstance(record, dict): - if 'value' in record and isinstance(record['value'], dict): - cls._alias_map = record['value'] - else: - cls._alias_map = record - else: - cls._alias_map = dict[str, str]() - - return cls._alias_map - - async def resolve_id(self) -> str | None: - """Get id of the aliased storage. - - Either locate the id in the in-memory mapping or create the new storage. - - Returns: - Storage id if it exists, None otherwise. - """ - return (await self._get_alias_map()).get(self._storage_key, None) - - async def store_mapping(self, storage_id: str) -> None: - """Add alias and related storage id to the mapping in default kvs and local in-memory mapping.""" - # Update in-memory mapping - (await self._get_alias_map())[self._storage_key] = storage_id - if not Configuration.get_global_configuration().is_at_home: - logging.getLogger(__name__).debug( - 'AliasResolver storage limited retention is only supported on Apify platform. Storage is not exported.' - ) - return - - default_kvs_client = await _get_default_kvs_client() - await default_kvs_client.get() - - try: - record = await default_kvs_client.get_record(self._ALIAS_MAPPING_KEY) - - # get_record can return {key: ..., value: ..., content_type: ...} - if isinstance(record, dict) and 'value' in record: - record = record['value'] - - # Update or create the record with the new alias mapping - if isinstance(record, dict): - record[self._storage_key] = storage_id - else: - record = {self._storage_key: storage_id} - - # Store the mapping back in the KVS. - await default_kvs_client.set_record(self._ALIAS_MAPPING_KEY, record) - except Exception as exc: - logger.warning(f'Error storing alias mapping for {self._alias}: {exc}') - - @property - def _storage_key(self) -> str: - """Get a unique storage key used for storing the alias in the mapping.""" - return self._ALIAS_STORAGE_KEY_SEPARATOR.join( - [ - self._storage_type.__name__, - self._alias, - self._additional_cache_key, - ] - ) - - -async def _get_default_kvs_client() -> KeyValueStoreClientAsync: - """Get a client for the default key-value store.""" - configuration = Configuration.get_global_configuration() - - apify_client_async = ApifyClientAsync( - token=configuration.token, - api_url=configuration.api_base_url, - max_retries=8, - min_delay_between_retries_millis=500, - timeout_secs=360, - ) - if not configuration.default_key_value_store_id: - raise ValueError("'Configuration.default_key_value_store_id' must be set.") - return apify_client_async.key_value_store(key_value_store_id=configuration.default_key_value_store_id) - - -def hash_api_base_url_and_token(configuration: Configuration) -> str: - """Hash configuration.api_public_base_url and configuration.token in deterministic way.""" - if configuration.api_public_base_url is None or configuration.token is None: - raise ValueError("'Configuration.api_public_base_url' and 'Configuration.token' must be set.") - return compute_short_hash(f'{configuration.api_public_base_url}{configuration.token}'.encode()) + from apify import Configuration def unique_key_to_request_id(unique_key: str, *, request_id_length: int = 15) -> str: @@ -194,28 +34,8 @@ def unique_key_to_request_id(unique_key: str, *, request_id_length: int = 15) -> return url_safe_key[:request_id_length] -def create_apify_client(configuration: Configuration) -> ApifyClientAsync: - """Create and return an ApifyClientAsync instance using the provided configuration.""" - if not configuration.token: - raise ValueError(f'Apify storage client requires a valid token in Configuration (token={configuration.token}).') - - api_url = configuration.api_base_url - if not api_url: - raise ValueError(f'Apify storage client requires a valid API URL in Configuration (api_url={api_url}).') - - api_public_base_url = configuration.api_public_base_url - if not api_public_base_url: - raise ValueError( - 'Apify storage client requires a valid API public base URL in Configuration ' - f'(api_public_base_url={api_public_base_url}).' - ) - - # Create Apify client with the provided token and API URL. - return ApifyClientAsync( - token=configuration.token, - api_url=api_url, - api_public_url=api_public_base_url, - max_retries=8, - min_delay_between_retries_millis=500, - timeout_secs=360, - ) +def hash_api_base_url_and_token(configuration: Configuration) -> str: + """Hash configuration.api_public_base_url and configuration.token in deterministic way.""" + if configuration.api_public_base_url is None or configuration.token is None: + raise ValueError("'Configuration.api_public_base_url' and 'Configuration.token' must be set.") + return compute_short_hash(f'{configuration.api_public_base_url}{configuration.token}'.encode()) diff --git a/tests/integration/apify_api/conftest.py b/tests/integration/apify_api/conftest.py index ac085da3..d2842475 100644 --- a/tests/integration/apify_api/conftest.py +++ b/tests/integration/apify_api/conftest.py @@ -9,7 +9,7 @@ from crawlee import service_locator import apify._actor -from apify.storage_clients._apify._utils import AliasResolver +from apify.storage_clients._apify._alias_resolving import AliasResolver if TYPE_CHECKING: from collections.abc import Callable diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index cf7f007f..34d48f13 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -17,7 +17,7 @@ import apify._actor import apify.log -from apify.storage_clients._apify._utils import AliasResolver +from apify.storage_clients._apify._alias_resolving import AliasResolver if TYPE_CHECKING: from collections.abc import Callable, Iterator