From 7107cafd91dff2678fa357ba6b402509364ad491 Mon Sep 17 00:00:00 2001 From: Josef Prochazka Date: Fri, 31 Oct 2025 13:29:19 +0100 Subject: [PATCH 01/10] Draft proposal. TODO: Abstract and rpeeat in other storages as well. --- .../_apify/_key_value_store_client.py | 107 +++++++++++------- src/apify/storage_clients/_apify/_utils.py | 2 - 2 files changed, 64 insertions(+), 45 deletions(-) diff --git a/src/apify/storage_clients/_apify/_key_value_store_client.py b/src/apify/storage_clients/_apify/_key_value_store_client.py index 8d698a80..03600e31 100644 --- a/src/apify/storage_clients/_apify/_key_value_store_client.py +++ b/src/apify/storage_clients/_apify/_key_value_store_client.py @@ -106,54 +106,75 @@ async def open( async with AliasResolver(storage_type=KeyValueStore, alias=alias, configuration=configuration) as _alias: id = await _alias.resolve_id() - # There was no pre-existing alias in the mapping. - # Create a new unnamed storage and store the mapping. - if id is None: - # Create a new storage and store the alias mapping - new_storage_metadata = ApifyKeyValueStoreMetadata.model_validate( - await apify_kvss_client.get_or_create(), - ) - id = new_storage_metadata.id - await _alias.store_mapping(storage_id=id) + if id: + # There was id, storage has to exist, fetch metadata to confirm it. + apify_kvs_client = apify_client_async.key_value_store(key_value_store_id=id) + raw_metadata = await apify_kvs_client.get() + if raw_metadata: + return cls( + api_client=apify_kvs_client, + api_public_base_url='', + # Remove in version 4.0, https://github.com/apify/apify-sdk-python/issues/635 + lock=asyncio.Lock(), + ) + + # There was no pre-existing alias in the mapping or the id did not point to existing storage. + # Create a new unnamed storage and store the alias mapping. + metadata = ApifyKeyValueStoreMetadata.model_validate( + await apify_kvss_client.get_or_create(), + ) + await _alias.store_mapping(storage_id=metadata.id) + + # Return the client for the newly created storage directly. + # It was just created, no need to refetch it. + return cls( + api_client=apify_client_async.key_value_store(key_value_store_id=metadata.id), + api_public_base_url='', # Remove in version 4.0, https://github.com/apify/apify-sdk-python/issues/635 + lock=asyncio.Lock(), + ) # If name is provided, get or create the storage by name. elif name: - id = ApifyKeyValueStoreMetadata.model_validate( - await apify_kvss_client.get_or_create(name=name), - ).id - - # If none are provided, try to get the default storage ID from environment variables. - elif id is None: - id = configuration.default_key_value_store_id - if not id: - raise ValueError( - 'KeyValueStore "id", "name", or "alias" must be specified, ' - 'or a default KeyValueStore ID must be set in the configuration.' - ) + metadata = ApifyKeyValueStoreMetadata.model_validate(await apify_kvss_client.get_or_create(name=name)) - # Now create the client for the determined ID - apify_kvs_client = apify_client_async.key_value_store(key_value_store_id=id) - - # Fetch its metadata. - metadata = await apify_kvs_client.get() - - # If metadata is None, it means the storage does not exist, so we create it. - if metadata is None: - id = ApifyKeyValueStoreMetadata.model_validate( - await apify_kvss_client.get_or_create(), - ).id + # Freshly fetched named storage. No need to fetch it again. + return cls( + api_client=apify_client_async.key_value_store(key_value_store_id=metadata.id), + api_public_base_url='', # Remove in version 4.0, https://github.com/apify/apify-sdk-python/issues/635 + lock=asyncio.Lock(), + ) + # If id is provided, then storage has to exists. + elif id: + # Now create the client for the determined ID apify_kvs_client = apify_client_async.key_value_store(key_value_store_id=id) - - # Verify that the storage exists by fetching its metadata again. - metadata = await apify_kvs_client.get() - if metadata is None: - raise ValueError(f'Opening key-value store with id={id}, name={name}, and alias={alias} failed.') - - return cls( - api_client=apify_kvs_client, - api_public_base_url='', # Remove in version 4.0, https://github.com/apify/apify-sdk-python/issues/635 - lock=asyncio.Lock(), - ) + # Fetch its metadata. + raw_metadata = await apify_kvs_client.get() + # If metadata is None, it means the storage does not exist. + if raw_metadata is None: + raise ValueError(f'Opening key-value store with id={id} failed.') + return cls( + api_client=apify_kvs_client, + api_public_base_url='', # Remove in version 4.0, https://github.com/apify/apify-sdk-python/issues/635 + lock=asyncio.Lock(), + ) + # Default key-value store ID from configuration + elif configuration.default_key_value_store_id: + # Now create the client for the determined ID + apify_kvs_client = apify_client_async.key_value_store( + key_value_store_id=configuration.default_key_value_store_id + ) + # Fetch its metadata. + raw_metadata = await apify_kvs_client.get() + if not raw_metadata: + metadata = ApifyKeyValueStoreMetadata.model_validate(await apify_kvss_client.get_or_create(name=name)) + apify_kvs_client = apify_client_async.key_value_store(key_value_store_id=metadata.id) + + return cls( + api_client=apify_kvs_client, + api_public_base_url='', # Remove in version 4.0, https://github.com/apify/apify-sdk-python/issues/635 + lock=asyncio.Lock(), + ) + raise RuntimeError('Will never happen') @override async def purge(self) -> None: diff --git a/src/apify/storage_clients/_apify/_utils.py b/src/apify/storage_clients/_apify/_utils.py index 0648d3f5..79012d8f 100644 --- a/src/apify/storage_clients/_apify/_utils.py +++ b/src/apify/storage_clients/_apify/_utils.py @@ -98,8 +98,6 @@ async def _get_alias_map(cls) -> dict[str, str]: async def resolve_id(self) -> str | None: """Get id of the aliased storage. - Either locate the id in the in-memory mapping or create the new storage. - Returns: Storage id if it exists, None otherwise. """ From 1dacd35eb19bb9563e215543160c0198c08da022 Mon Sep 17 00:00:00 2001 From: Josef Prochazka Date: Wed, 5 Nov 2025 08:39:42 +0100 Subject: [PATCH 02/10] Finished KVS, TODO: RQ and Dataset --- .../_apify/_key_value_store_client.py | 123 +++++---------- .../_apify/_request_queue_client.py | 82 ++-------- src/apify/storage_clients/_apify/_utils.py | 140 +++++++++++++++++- 3 files changed, 180 insertions(+), 165 deletions(-) diff --git a/src/apify/storage_clients/_apify/_key_value_store_client.py b/src/apify/storage_clients/_apify/_key_value_store_client.py index 03600e31..24f9415a 100644 --- a/src/apify/storage_clients/_apify/_key_value_store_client.py +++ b/src/apify/storage_clients/_apify/_key_value_store_client.py @@ -7,23 +7,49 @@ from typing_extensions import override +from apify_client.clients import ( + KeyValueStoreClientAsync, +) from crawlee.storage_clients._base import KeyValueStoreClient from crawlee.storage_clients.models import KeyValueStoreRecord, KeyValueStoreRecordMetadata from crawlee.storages import KeyValueStore from ._models import ApifyKeyValueStoreMetadata, KeyValueStoreListKeysPage -from ._utils import AliasResolver, create_apify_client +from ._utils import ApiClientFactory if TYPE_CHECKING: from collections.abc import AsyncIterator - from apify_client.clients import KeyValueStoreClientAsync + from apify_client.clients import ( + KeyValueStoreCollectionClientAsync, + ) from apify import Configuration logger = getLogger(__name__) +class KvsApiClientFactory(ApiClientFactory[KeyValueStoreClientAsync, ApifyKeyValueStoreMetadata]): + @property + def _collection_client(self) -> KeyValueStoreCollectionClientAsync: + return self._api_client.key_value_stores() + + def _get_resource_client(self, id: str) -> KeyValueStoreClientAsync: + return self._api_client.key_value_store(key_value_store_id=id) + + @property + def _default_id(self) -> str | None: + return self._configuration.default_key_value_store_id + + @property + def _storage_type(self) -> type[KeyValueStore]: + return KeyValueStore + + @staticmethod + def _get_metadata(raw_metadata: dict | None) -> ApifyKeyValueStoreMetadata: + return ApifyKeyValueStoreMetadata.model_validate(raw_metadata) + + class ApifyKeyValueStoreClient(KeyValueStoreClient): """An Apify platform implementation of the key-value store client.""" @@ -90,91 +116,14 @@ async def open( `id`, `name`, or `alias` is provided, or if none are provided and no default storage ID is available in the configuration. """ - if sum(1 for param in [id, name, alias] if param is not None) > 1: - raise ValueError('Only one of "id", "name", or "alias" can be specified, not multiple.') - - apify_client_async = create_apify_client(configuration) - apify_kvss_client = apify_client_async.key_value_stores() - - # Normalize unnamed default storage in cases where not defined in `configuration.default_key_value_store_id` to - # unnamed storage aliased as `__default__` - if not any([alias, name, id, configuration.default_key_value_store_id]): - alias = '__default__' - - if alias: - # Check if there is pre-existing alias mapping in the default KVS. - async with AliasResolver(storage_type=KeyValueStore, alias=alias, configuration=configuration) as _alias: - id = await _alias.resolve_id() - - if id: - # There was id, storage has to exist, fetch metadata to confirm it. - apify_kvs_client = apify_client_async.key_value_store(key_value_store_id=id) - raw_metadata = await apify_kvs_client.get() - if raw_metadata: - return cls( - api_client=apify_kvs_client, - api_public_base_url='', - # Remove in version 4.0, https://github.com/apify/apify-sdk-python/issues/635 - lock=asyncio.Lock(), - ) - - # There was no pre-existing alias in the mapping or the id did not point to existing storage. - # Create a new unnamed storage and store the alias mapping. - metadata = ApifyKeyValueStoreMetadata.model_validate( - await apify_kvss_client.get_or_create(), - ) - await _alias.store_mapping(storage_id=metadata.id) - - # Return the client for the newly created storage directly. - # It was just created, no need to refetch it. - return cls( - api_client=apify_client_async.key_value_store(key_value_store_id=metadata.id), - api_public_base_url='', # Remove in version 4.0, https://github.com/apify/apify-sdk-python/issues/635 - lock=asyncio.Lock(), - ) - - # If name is provided, get or create the storage by name. - elif name: - metadata = ApifyKeyValueStoreMetadata.model_validate(await apify_kvss_client.get_or_create(name=name)) - - # Freshly fetched named storage. No need to fetch it again. - return cls( - api_client=apify_client_async.key_value_store(key_value_store_id=metadata.id), - api_public_base_url='', # Remove in version 4.0, https://github.com/apify/apify-sdk-python/issues/635 - lock=asyncio.Lock(), - ) - # If id is provided, then storage has to exists. - elif id: - # Now create the client for the determined ID - apify_kvs_client = apify_client_async.key_value_store(key_value_store_id=id) - # Fetch its metadata. - raw_metadata = await apify_kvs_client.get() - # If metadata is None, it means the storage does not exist. - if raw_metadata is None: - raise ValueError(f'Opening key-value store with id={id} failed.') - return cls( - api_client=apify_kvs_client, - api_public_base_url='', # Remove in version 4.0, https://github.com/apify/apify-sdk-python/issues/635 - lock=asyncio.Lock(), - ) - # Default key-value store ID from configuration - elif configuration.default_key_value_store_id: - # Now create the client for the determined ID - apify_kvs_client = apify_client_async.key_value_store( - key_value_store_id=configuration.default_key_value_store_id - ) - # Fetch its metadata. - raw_metadata = await apify_kvs_client.get() - if not raw_metadata: - metadata = ApifyKeyValueStoreMetadata.model_validate(await apify_kvss_client.get_or_create(name=name)) - apify_kvs_client = apify_client_async.key_value_store(key_value_store_id=metadata.id) - - return cls( - api_client=apify_kvs_client, - api_public_base_url='', # Remove in version 4.0, https://github.com/apify/apify-sdk-python/issues/635 - lock=asyncio.Lock(), - ) - raise RuntimeError('Will never happen') + api_client, _ =await KvsApiClientFactory( + configuration=configuration, alias=alias, name=name, id=id + ).get_client_with_metadata() + return cls( + api_client=api_client, + api_public_base_url='', # Remove in version 4.0, https://github.com/apify/apify-sdk-python/issues/635 + lock=asyncio.Lock(), + ) @override async def purge(self) -> None: diff --git a/src/apify/storage_clients/_apify/_request_queue_client.py b/src/apify/storage_clients/_apify/_request_queue_client.py index 74c48cde..696353e9 100644 --- a/src/apify/storage_clients/_apify/_request_queue_client.py +++ b/src/apify/storage_clients/_apify/_request_queue_client.py @@ -5,21 +5,18 @@ from typing_extensions import override -from crawlee._utils.crypto import crypto_random_object_id from crawlee.storage_clients._base import RequestQueueClient -from crawlee.storage_clients.models import AddRequestsResponse, ProcessedRequest, RequestQueueMetadata -from crawlee.storages import RequestQueue from ._models import ApifyRequestQueueMetadata, RequestQueueStats from ._request_queue_shared_client import ApifyRequestQueueSharedClient from ._request_queue_single_client import ApifyRequestQueueSingleClient -from ._utils import AliasResolver, create_apify_client if TYPE_CHECKING: from collections.abc import Sequence from apify_client.clients import RequestQueueClientAsync from crawlee import Request + from crawlee.storage_clients.models import AddRequestsResponse, ProcessedRequest, RequestQueueMetadata from apify import Configuration @@ -224,73 +221,16 @@ async def open( `id`, `name`, or `alias` is provided, or if none are provided and no default storage ID is available in the configuration. """ - if sum(1 for param in [id, name, alias] if param is not None) > 1: - raise ValueError('Only one of "id", "name", or "alias" can be specified, not multiple.') - - apify_client_async = create_apify_client(configuration) - apify_rqs_client = apify_client_async.request_queues() - - # Normalize unnamed default storage in cases where not defined in `configuration.default_request_queue_id` to - # unnamed storage aliased as `__default__` - if not any([alias, name, id, configuration.default_request_queue_id]): - alias = '__default__' - - if alias: - # Check if there is pre-existing alias mapping in the default KVS. - async with AliasResolver(storage_type=RequestQueue, alias=alias, configuration=configuration) as _alias: - id = await _alias.resolve_id() - - # There was no pre-existing alias in the mapping. - # Create a new unnamed storage and store the mapping. - if id is None: - new_storage_metadata = RequestQueueMetadata.model_validate( - await apify_rqs_client.get_or_create(), - ) - id = new_storage_metadata.id - await _alias.store_mapping(storage_id=id) - - # If name is provided, get or create the storage by name. - elif name: - id = RequestQueueMetadata.model_validate( - await apify_rqs_client.get_or_create(name=name), - ).id - - # If none are provided, try to get the default storage ID from environment variables. - elif id is None: - id = configuration.default_request_queue_id - if not id: - raise ValueError( - 'RequestQueue "id", "name", or "alias" must be specified, ' - 'or a default default_request_queue_id ID must be set in the configuration.' - ) - - # Use suitable client_key to make `hadMultipleClients` response of Apify API useful. - # It should persist across migrated or resurrected Actor runs on the Apify platform. - _api_max_client_key_length = 32 - client_key = (configuration.actor_run_id or crypto_random_object_id(length=_api_max_client_key_length))[ - :_api_max_client_key_length - ] - - apify_rq_client = apify_client_async.request_queue(request_queue_id=id, client_key=client_key) - - # Fetch its metadata. - metadata = await apify_rq_client.get() - - # If metadata is None, it means the storage does not exist, so we create it. - if metadata is None: - id = RequestQueueMetadata.model_validate( - await apify_rqs_client.get_or_create(), - ).id - apify_rq_client = apify_client_async.request_queue(request_queue_id=id, client_key=client_key) - - # Verify that the storage exists by fetching its metadata again. - metadata = await apify_rq_client.get() - if metadata is None: - raise ValueError(f'Opening request queue with id={id}, name={name}, and alias={alias} failed.') - - metadata_model = RequestQueueMetadata.model_validate(metadata) - - return cls(api_client=apify_rq_client, metadata=metadata_model, access=access) + _api_client, metadata =await RqApiClientFactory( + configuration=configuration, alias=alias, name=name, id=id + ).get_client_with_metadata() + return cls( + api_client=await RqApiClientFactory( + configuration=configuration, alias=alias, name=name, id=id + ).create_api_client(), + metadata=metadata, + access=access, + ) @override async def purge(self) -> None: diff --git a/src/apify/storage_clients/_apify/_utils.py b/src/apify/storage_clients/_apify/_utils.py index 79012d8f..b45a195b 100644 --- a/src/apify/storage_clients/_apify/_utils.py +++ b/src/apify/storage_clients/_apify/_utils.py @@ -2,26 +2,49 @@ import logging import re +from abc import ABC, abstractmethod from asyncio import Lock from base64 import b64encode from hashlib import sha256 from logging import getLogger -from typing import TYPE_CHECKING, ClassVar +from typing import TYPE_CHECKING, ClassVar, Generic, TypeVar from apify_client import ApifyClientAsync +from apify_client.clients import ( + DatasetClientAsync, + DatasetCollectionClientAsync, + KeyValueStoreClientAsync, + KeyValueStoreCollectionClientAsync, + RequestQueueClientAsync, + RequestQueueCollectionClientAsync, +) from crawlee._utils.crypto import compute_short_hash +from crawlee.storage_clients.models import ( + DatasetMetadata, + KeyValueStoreMetadata, + RequestQueueMetadata, +) -from apify._configuration import Configuration +from apify import Configuration +from apify.storage_clients._apify._models import ApifyKeyValueStoreMetadata if TYPE_CHECKING: from types import TracebackType - from apify_client.clients import KeyValueStoreClientAsync - from crawlee.storages import Dataset, KeyValueStore, RequestQueue + from crawlee.storages._base import Storage logger = getLogger(__name__) +ResourceCollectionClient = ( + KeyValueStoreCollectionClientAsync | RequestQueueCollectionClientAsync | DatasetCollectionClientAsync +) +TResourceClient = TypeVar( + 'TResourceClient', bound=KeyValueStoreClientAsync | RequestQueueClientAsync | DatasetClientAsync +) +TStorageMetadata = TypeVar( + 'TStorageMetadata', bound=KeyValueStoreMetadata | RequestQueueMetadata | DatasetMetadata +) class AliasResolver: """Class for handling aliases. @@ -40,9 +63,7 @@ class AliasResolver: _ALIAS_STORAGE_KEY_SEPARATOR = ',' _ALIAS_MAPPING_KEY = '__STORAGE_ALIASES_MAPPING' - def __init__( - self, storage_type: type[Dataset | KeyValueStore | RequestQueue], alias: str, configuration: Configuration - ) -> None: + def __init__(self, storage_type: type[Storage], alias: str, configuration: Configuration) -> None: self._storage_type = storage_type self._alias = alias self._additional_cache_key = hash_api_base_url_and_token(configuration) @@ -217,3 +238,108 @@ def create_apify_client(configuration: Configuration) -> ApifyClientAsync: min_delay_between_retries_millis=500, timeout_secs=360, ) + + +class ApiClientFactory(ABC, Generic[TResourceClient, TStorageMetadata]): + def __init__(self, configuration: Configuration, alias: str | None, name: str | None, id: str | None) -> None: + if sum(1 for param in [id, name, alias] if param is not None) > 1: + raise ValueError('Only one of "id", "name", or "alias" can be specified, not multiple.') + + self.alias = alias + self.name = name + self.id = id + self._configuration = configuration + self._api_client = create_apify_client(configuration) + + async def get_client_with_metadata(self) -> tuple[TResourceClient, TStorageMetadata]: + match (self.alias, self.name, self.id, self._default_id): + case (None, None, None, None): + # Normalize unnamed default storage in cases where not defined in `self._default_id` to + # unnamed storage aliased as `__default__`. Used only when running locally. + return await self._open_by_alias('__default__') + + case (str(), None, None, _): + return await self._open_by_alias(self.alias) + + case (None, None, None, str()): + # Now create the client for the determined ID + resource_client = self._get_resource_client(id=self._default_id) + # Fetch its metadata. + raw_metadata = await resource_client.get() + metadata = self._get_metadata(raw_metadata) + if not raw_metadata: + # Do we want this??? Backwards compatibility, so probably yes. + # Default storage does not exist. Create a new one with new id. + raw_metadata = await self._collection_client.get_or_create() + metadata = self._get_metadata(raw_metadata) + resource_client = self._get_resource_client(id=metadata.id) + return resource_client, metadata + + case (None, str(), None, _): + metadata = self._get_metadata( + await self._collection_client.get_or_create(name=self.name)) + # Freshly fetched named storage. No need to fetch it again. + return self._get_resource_client(id=metadata.id), metadata + + case (None, None, str(), _): + # Now create the client for the determined ID + resource_client = self._get_resource_client(id=self.id) + # Fetch its metadata. + raw_metadata = await resource_client.get() + # If metadata is None, it means the storage does not exist. + if raw_metadata is None: + raise ValueError(f'Opening key-value store with id={self.id} failed.') + return resource_client, self._get_metadata(raw_metadata) + + raise RuntimeError('Will never happen') + + @property + @abstractmethod + def _collection_client(self) -> ResourceCollectionClient: + """Get a collection API client.""" + + @property + @abstractmethod + def _default_id(self) -> str | None: + """Get a metadata model class.""" + + @property + @abstractmethod + def _storage_type(self) -> type[Storage]: + """Get a metadata model class.""" + + @abstractmethod + def _get_resource_client(self, id: str) -> TResourceClient: + """Get a resource API client.""" + + @staticmethod + @abstractmethod + def _get_metadata(raw_metadata: dict | None) -> TStorageMetadata: + """Get a metadata model class.""" + + async def _open_by_alias(self, alias: str) -> tuple[TResourceClient, TStorageMetadata]: + # Check if there is pre-existing alias mapping in the default KVS. + async with AliasResolver( + storage_type=self._storage_type, alias=alias, configuration=self._configuration + ) as _alias: + id = await _alias.resolve_id() + + if id: + # There was id, storage has to exist, fetch metadata to confirm it. + resource_client = self._get_resource_client(id=id) + raw_metadata = await resource_client.get() + if raw_metadata: + return resource_client, self._get_metadata(raw_metadata) + # If we do not raise here, we will behave same as for default storage. We create it even though it + # should exist already. Consistency or throw an error??? + + # There was no pre-existing alias in the mapping or the id did not point to existing storage. + # Create a new unnamed storage and store the alias mapping. + metadata = ApifyKeyValueStoreMetadata.model_validate( + await self._collection_client.get_or_create(), + ) + await _alias.store_mapping(storage_id=metadata.id) + + # Return the client for the newly created storage directly. + # It was just created, no need to refetch it. + return self._get_resource_client(id=metadata.id), self._get_metadata(raw_metadata) From b00d2b9d84e3c9ee75f35c5aa94aa4b778445fa6 Mon Sep 17 00:00:00 2001 From: Josef Prochazka Date: Wed, 5 Nov 2025 08:53:05 +0100 Subject: [PATCH 03/10] Use factory in Rq and Dataset --- .../storage_clients/_apify/_dataset_client.py | 88 ++++++------------- .../_apify/_key_value_store_client.py | 48 +++++----- .../_apify/_request_queue_client.py | 36 ++++++-- src/apify/storage_clients/_apify/_utils.py | 27 +++--- 4 files changed, 92 insertions(+), 107 deletions(-) diff --git a/src/apify/storage_clients/_apify/_dataset_client.py b/src/apify/storage_clients/_apify/_dataset_client.py index 6a30bfbc..bf78378c 100644 --- a/src/apify/storage_clients/_apify/_dataset_client.py +++ b/src/apify/storage_clients/_apify/_dataset_client.py @@ -7,18 +7,19 @@ from typing_extensions import override +from apify_client.clients import DatasetClientAsync from crawlee._utils.byte_size import ByteSize from crawlee._utils.file import json_dumps from crawlee.storage_clients._base import DatasetClient from crawlee.storage_clients.models import DatasetItemsListPage, DatasetMetadata from crawlee.storages import Dataset -from ._utils import AliasResolver, create_apify_client +from ._utils import ApiClientFactory if TYPE_CHECKING: from collections.abc import AsyncIterator - from apify_client.clients import DatasetClientAsync + from apify_client.clients import DatasetCollectionClientAsync from crawlee._types import JsonSerializable from apify import Configuration @@ -101,66 +102,12 @@ async def open( `id`, `name`, or `alias` is provided, or if none are provided and no default storage ID is available in the configuration. """ - if sum(1 for param in [id, name, alias] if param is not None) > 1: - raise ValueError('Only one of "id", "name", or "alias" can be specified, not multiple.') - - apify_client_async = create_apify_client(configuration) - apify_datasets_client = apify_client_async.datasets() - - # Normalize unnamed default storage in cases where not defined in `configuration.default_dataset_id` to unnamed - # storage aliased as `__default__` - if not any([alias, name, id, configuration.default_dataset_id]): - alias = '__default__' - - if alias: - # Check if there is pre-existing alias mapping in the default KVS. - async with AliasResolver(storage_type=Dataset, alias=alias, configuration=configuration) as _alias: - id = await _alias.resolve_id() - - # There was no pre-existing alias in the mapping. - # Create a new unnamed storage and store the mapping. - if id is None: - new_storage_metadata = DatasetMetadata.model_validate( - await apify_datasets_client.get_or_create(), - ) - id = new_storage_metadata.id - await _alias.store_mapping(storage_id=id) - - # If name is provided, get or create the storage by name. - elif name: - id = DatasetMetadata.model_validate( - await apify_datasets_client.get_or_create(name=name), - ).id - - # If none are provided, try to get the default storage ID from environment variables. - elif id is None: - id = configuration.default_dataset_id - if not id: - raise ValueError( - 'Dataset "id", "name", or "alias" must be specified, ' - 'or a default dataset ID must be set in the configuration.' - ) - - # Now create the client for the determined ID - apify_dataset_client = apify_client_async.dataset(dataset_id=id) - - # Fetch its metadata. - metadata = await apify_dataset_client.get() - - # If metadata is None, it means the storage does not exist, so we create it. - if metadata is None: - id = DatasetMetadata.model_validate( - await apify_datasets_client.get_or_create(), - ).id - apify_dataset_client = apify_client_async.dataset(dataset_id=id) - - # Verify that the storage exists by fetching its metadata again. - metadata = await apify_dataset_client.get() - if metadata is None: - raise ValueError(f'Opening dataset with id={id}, name={name}, and alias={alias} failed.') + api_client, _ = await DatasetApiClientFactory( + configuration=configuration, alias=alias, name=name, id=id + ).get_client_with_metadata() return cls( - api_client=apify_dataset_client, + api_client=api_client, api_public_base_url='', # Remove in version 4.0, https://github.com/apify/apify-sdk-python/issues/635 lock=asyncio.Lock(), ) @@ -309,3 +256,24 @@ async def _chunk_by_size(self, items: AsyncIterator[str]) -> AsyncIterator[str]: last_chunk_size = payload_size + ByteSize(2) # Add 2 bytes for [] wrapper. yield f'[{",".join(current_chunk)}]' + + +class DatasetApiClientFactory(ApiClientFactory[DatasetClientAsync, DatasetMetadata]): + @property + def _collection_client(self) -> DatasetCollectionClientAsync: + return self._api_client.datasets() + + def _get_resource_client(self, id: str) -> DatasetClientAsync: + return self._api_client.dataset(dataset_id=id) + + @property + def _default_id(self) -> str | None: + return self._configuration.default_dataset_id + + @property + def _storage_type(self) -> type[Dataset]: + return Dataset + + @staticmethod + def _get_metadata(raw_metadata: dict | None) -> DatasetMetadata: + return DatasetMetadata.model_validate(raw_metadata) diff --git a/src/apify/storage_clients/_apify/_key_value_store_client.py b/src/apify/storage_clients/_apify/_key_value_store_client.py index 24f9415a..3979eee1 100644 --- a/src/apify/storage_clients/_apify/_key_value_store_client.py +++ b/src/apify/storage_clients/_apify/_key_value_store_client.py @@ -29,27 +29,6 @@ logger = getLogger(__name__) -class KvsApiClientFactory(ApiClientFactory[KeyValueStoreClientAsync, ApifyKeyValueStoreMetadata]): - @property - def _collection_client(self) -> KeyValueStoreCollectionClientAsync: - return self._api_client.key_value_stores() - - def _get_resource_client(self, id: str) -> KeyValueStoreClientAsync: - return self._api_client.key_value_store(key_value_store_id=id) - - @property - def _default_id(self) -> str | None: - return self._configuration.default_key_value_store_id - - @property - def _storage_type(self) -> type[KeyValueStore]: - return KeyValueStore - - @staticmethod - def _get_metadata(raw_metadata: dict | None) -> ApifyKeyValueStoreMetadata: - return ApifyKeyValueStoreMetadata.model_validate(raw_metadata) - - class ApifyKeyValueStoreClient(KeyValueStoreClient): """An Apify platform implementation of the key-value store client.""" @@ -116,9 +95,9 @@ async def open( `id`, `name`, or `alias` is provided, or if none are provided and no default storage ID is available in the configuration. """ - api_client, _ =await KvsApiClientFactory( - configuration=configuration, alias=alias, name=name, id=id - ).get_client_with_metadata() + api_client, _ = await KvsApiClientFactory( + configuration=configuration, alias=alias, name=name, id=id + ).get_client_with_metadata() return cls( api_client=api_client, api_public_base_url='', # Remove in version 4.0, https://github.com/apify/apify-sdk-python/issues/635 @@ -203,3 +182,24 @@ async def get_public_url(self, key: str) -> str: A public URL that can be used to access the value of the given key in the KVS. """ return await self._api_client.get_record_public_url(key=key) + + +class KvsApiClientFactory(ApiClientFactory[KeyValueStoreClientAsync, ApifyKeyValueStoreMetadata]): + @property + def _collection_client(self) -> KeyValueStoreCollectionClientAsync: + return self._api_client.key_value_stores() + + def _get_resource_client(self, id: str) -> KeyValueStoreClientAsync: + return self._api_client.key_value_store(key_value_store_id=id) + + @property + def _default_id(self) -> str | None: + return self._configuration.default_key_value_store_id + + @property + def _storage_type(self) -> type[KeyValueStore]: + return KeyValueStore + + @staticmethod + def _get_metadata(raw_metadata: dict | None) -> ApifyKeyValueStoreMetadata: + return ApifyKeyValueStoreMetadata.model_validate(raw_metadata) diff --git a/src/apify/storage_clients/_apify/_request_queue_client.py b/src/apify/storage_clients/_apify/_request_queue_client.py index 696353e9..62cbb1e8 100644 --- a/src/apify/storage_clients/_apify/_request_queue_client.py +++ b/src/apify/storage_clients/_apify/_request_queue_client.py @@ -5,16 +5,19 @@ from typing_extensions import override +from apify_client.clients import RequestQueueClientAsync from crawlee.storage_clients._base import RequestQueueClient +from crawlee.storages import RequestQueue from ._models import ApifyRequestQueueMetadata, RequestQueueStats from ._request_queue_shared_client import ApifyRequestQueueSharedClient from ._request_queue_single_client import ApifyRequestQueueSingleClient +from ._utils import ApiClientFactory if TYPE_CHECKING: from collections.abc import Sequence - from apify_client.clients import RequestQueueClientAsync + from apify_client.clients import RequestQueueCollectionClientAsync from crawlee import Request from crawlee.storage_clients.models import AddRequestsResponse, ProcessedRequest, RequestQueueMetadata @@ -221,13 +224,11 @@ async def open( `id`, `name`, or `alias` is provided, or if none are provided and no default storage ID is available in the configuration. """ - _api_client, metadata =await RqApiClientFactory( - configuration=configuration, alias=alias, name=name, id=id - ).get_client_with_metadata() + _api_client, metadata = await RqApiClientFactory( + configuration=configuration, alias=alias, name=name, id=id + ).get_client_with_metadata() return cls( - api_client=await RqApiClientFactory( - configuration=configuration, alias=alias, name=name, id=id - ).create_api_client(), + api_client=_api_client, metadata=metadata, access=access, ) @@ -242,3 +243,24 @@ async def purge(self) -> None: @override async def drop(self) -> None: await self._api_client.delete() + + +class RqApiClientFactory(ApiClientFactory[RequestQueueClientAsync, ApifyRequestQueueMetadata]): + @property + def _collection_client(self) -> RequestQueueCollectionClientAsync: + return self._api_client.request_queues() + + def _get_resource_client(self, id: str) -> RequestQueueClientAsync: + return self._api_client.request_queue(request_queue_id=id) + + @property + def _default_id(self) -> str | None: + return self._configuration.default_request_queue_id + + @property + def _storage_type(self) -> type[RequestQueue]: + return RequestQueue + + @staticmethod + def _get_metadata(raw_metadata: dict | None) -> ApifyRequestQueueMetadata: + return ApifyRequestQueueMetadata.model_validate(raw_metadata) diff --git a/src/apify/storage_clients/_apify/_utils.py b/src/apify/storage_clients/_apify/_utils.py index b45a195b..c4e39120 100644 --- a/src/apify/storage_clients/_apify/_utils.py +++ b/src/apify/storage_clients/_apify/_utils.py @@ -42,9 +42,8 @@ TResourceClient = TypeVar( 'TResourceClient', bound=KeyValueStoreClientAsync | RequestQueueClientAsync | DatasetClientAsync ) -TStorageMetadata = TypeVar( - 'TStorageMetadata', bound=KeyValueStoreMetadata | RequestQueueMetadata | DatasetMetadata -) +TStorageMetadata = TypeVar('TStorageMetadata', bound=KeyValueStoreMetadata | RequestQueueMetadata | DatasetMetadata) + class AliasResolver: """Class for handling aliases. @@ -245,26 +244,24 @@ def __init__(self, configuration: Configuration, alias: str | None, name: str | if sum(1 for param in [id, name, alias] if param is not None) > 1: raise ValueError('Only one of "id", "name", or "alias" can be specified, not multiple.') - self.alias = alias - self.name = name - self.id = id + self._alias = alias + self._name = name + self._id = id self._configuration = configuration self._api_client = create_apify_client(configuration) async def get_client_with_metadata(self) -> tuple[TResourceClient, TStorageMetadata]: - match (self.alias, self.name, self.id, self._default_id): + match (self._alias, self._name, self._id, self._default_id): case (None, None, None, None): # Normalize unnamed default storage in cases where not defined in `self._default_id` to # unnamed storage aliased as `__default__`. Used only when running locally. return await self._open_by_alias('__default__') case (str(), None, None, _): - return await self._open_by_alias(self.alias) + return await self._open_by_alias(self._alias) case (None, None, None, str()): - # Now create the client for the determined ID resource_client = self._get_resource_client(id=self._default_id) - # Fetch its metadata. raw_metadata = await resource_client.get() metadata = self._get_metadata(raw_metadata) if not raw_metadata: @@ -276,22 +273,20 @@ async def get_client_with_metadata(self) -> tuple[TResourceClient, TStorageMetad return resource_client, metadata case (None, str(), None, _): - metadata = self._get_metadata( - await self._collection_client.get_or_create(name=self.name)) + metadata = self._get_metadata(await self._collection_client.get_or_create(name=self._name)) # Freshly fetched named storage. No need to fetch it again. return self._get_resource_client(id=metadata.id), metadata case (None, None, str(), _): - # Now create the client for the determined ID - resource_client = self._get_resource_client(id=self.id) + resource_client = self._get_resource_client(id=self._id) # Fetch its metadata. raw_metadata = await resource_client.get() # If metadata is None, it means the storage does not exist. if raw_metadata is None: - raise ValueError(f'Opening key-value store with id={self.id} failed.') + raise ValueError(f'Opening {self._storage_type} with id={self._id} failed.') return resource_client, self._get_metadata(raw_metadata) - raise RuntimeError('Will never happen') + raise RuntimeError('Unreachable code') @property @abstractmethod From bf0b75c1258d1e58cd7fead0c74ac986e9893063 Mon Sep 17 00:00:00 2001 From: Josef Prochazka Date: Wed, 5 Nov 2025 09:06:05 +0100 Subject: [PATCH 04/10] Re-order methods --- src/apify/storage_clients/_apify/_dataset_client.py | 6 +++--- .../storage_clients/_apify/_key_value_store_client.py | 6 +++--- src/apify/storage_clients/_apify/_request_queue_client.py | 6 +++--- src/apify/storage_clients/_apify/_utils.py | 8 ++++---- 4 files changed, 13 insertions(+), 13 deletions(-) diff --git a/src/apify/storage_clients/_apify/_dataset_client.py b/src/apify/storage_clients/_apify/_dataset_client.py index bf78378c..e598280d 100644 --- a/src/apify/storage_clients/_apify/_dataset_client.py +++ b/src/apify/storage_clients/_apify/_dataset_client.py @@ -263,9 +263,6 @@ class DatasetApiClientFactory(ApiClientFactory[DatasetClientAsync, DatasetMetada def _collection_client(self) -> DatasetCollectionClientAsync: return self._api_client.datasets() - def _get_resource_client(self, id: str) -> DatasetClientAsync: - return self._api_client.dataset(dataset_id=id) - @property def _default_id(self) -> str | None: return self._configuration.default_dataset_id @@ -277,3 +274,6 @@ def _storage_type(self) -> type[Dataset]: @staticmethod def _get_metadata(raw_metadata: dict | None) -> DatasetMetadata: return DatasetMetadata.model_validate(raw_metadata) + + def _get_resource_client(self, id: str) -> DatasetClientAsync: + return self._api_client.dataset(dataset_id=id) diff --git a/src/apify/storage_clients/_apify/_key_value_store_client.py b/src/apify/storage_clients/_apify/_key_value_store_client.py index 3979eee1..8c481430 100644 --- a/src/apify/storage_clients/_apify/_key_value_store_client.py +++ b/src/apify/storage_clients/_apify/_key_value_store_client.py @@ -189,9 +189,6 @@ class KvsApiClientFactory(ApiClientFactory[KeyValueStoreClientAsync, ApifyKeyVal def _collection_client(self) -> KeyValueStoreCollectionClientAsync: return self._api_client.key_value_stores() - def _get_resource_client(self, id: str) -> KeyValueStoreClientAsync: - return self._api_client.key_value_store(key_value_store_id=id) - @property def _default_id(self) -> str | None: return self._configuration.default_key_value_store_id @@ -203,3 +200,6 @@ def _storage_type(self) -> type[KeyValueStore]: @staticmethod def _get_metadata(raw_metadata: dict | None) -> ApifyKeyValueStoreMetadata: return ApifyKeyValueStoreMetadata.model_validate(raw_metadata) + + def _get_resource_client(self, id: str) -> KeyValueStoreClientAsync: + return self._api_client.key_value_store(key_value_store_id=id) diff --git a/src/apify/storage_clients/_apify/_request_queue_client.py b/src/apify/storage_clients/_apify/_request_queue_client.py index 62cbb1e8..9bc93cf4 100644 --- a/src/apify/storage_clients/_apify/_request_queue_client.py +++ b/src/apify/storage_clients/_apify/_request_queue_client.py @@ -250,9 +250,6 @@ class RqApiClientFactory(ApiClientFactory[RequestQueueClientAsync, ApifyRequestQ def _collection_client(self) -> RequestQueueCollectionClientAsync: return self._api_client.request_queues() - def _get_resource_client(self, id: str) -> RequestQueueClientAsync: - return self._api_client.request_queue(request_queue_id=id) - @property def _default_id(self) -> str | None: return self._configuration.default_request_queue_id @@ -264,3 +261,6 @@ def _storage_type(self) -> type[RequestQueue]: @staticmethod def _get_metadata(raw_metadata: dict | None) -> ApifyRequestQueueMetadata: return ApifyRequestQueueMetadata.model_validate(raw_metadata) + + def _get_resource_client(self, id: str) -> RequestQueueClientAsync: + return self._api_client.request_queue(request_queue_id=id) diff --git a/src/apify/storage_clients/_apify/_utils.py b/src/apify/storage_clients/_apify/_utils.py index c4e39120..583e84d6 100644 --- a/src/apify/storage_clients/_apify/_utils.py +++ b/src/apify/storage_clients/_apify/_utils.py @@ -303,10 +303,6 @@ def _default_id(self) -> str | None: def _storage_type(self) -> type[Storage]: """Get a metadata model class.""" - @abstractmethod - def _get_resource_client(self, id: str) -> TResourceClient: - """Get a resource API client.""" - @staticmethod @abstractmethod def _get_metadata(raw_metadata: dict | None) -> TStorageMetadata: @@ -338,3 +334,7 @@ async def _open_by_alias(self, alias: str) -> tuple[TResourceClient, TStorageMet # Return the client for the newly created storage directly. # It was just created, no need to refetch it. return self._get_resource_client(id=metadata.id), self._get_metadata(raw_metadata) + + @abstractmethod + def _get_resource_client(self, id: str) -> TResourceClient: + """Get a resource API client.""" From cc903b1bbbfb7548fc9ef0c7a000aab41525bc1b Mon Sep 17 00:00:00 2001 From: Josef Prochazka Date: Wed, 5 Nov 2025 09:12:14 +0100 Subject: [PATCH 05/10] Fix cyclic import --- src/apify/storage_clients/_apify/_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/apify/storage_clients/_apify/_utils.py b/src/apify/storage_clients/_apify/_utils.py index 583e84d6..a1a6b4be 100644 --- a/src/apify/storage_clients/_apify/_utils.py +++ b/src/apify/storage_clients/_apify/_utils.py @@ -25,7 +25,7 @@ RequestQueueMetadata, ) -from apify import Configuration +from apify._configuration import Configuration from apify.storage_clients._apify._models import ApifyKeyValueStoreMetadata if TYPE_CHECKING: From 531869a36bdf380d02b62695e311b7409c170a24 Mon Sep 17 00:00:00 2001 From: Josef Prochazka Date: Wed, 5 Nov 2025 11:32:28 +0100 Subject: [PATCH 06/10] Fix failing integration tests --- src/apify/storage_clients/_apify/_utils.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/apify/storage_clients/_apify/_utils.py b/src/apify/storage_clients/_apify/_utils.py index a1a6b4be..286b669a 100644 --- a/src/apify/storage_clients/_apify/_utils.py +++ b/src/apify/storage_clients/_apify/_utils.py @@ -326,9 +326,8 @@ async def _open_by_alias(self, alias: str) -> tuple[TResourceClient, TStorageMet # There was no pre-existing alias in the mapping or the id did not point to existing storage. # Create a new unnamed storage and store the alias mapping. - metadata = ApifyKeyValueStoreMetadata.model_validate( - await self._collection_client.get_or_create(), - ) + raw_metadata = await self._collection_client.get_or_create() + metadata = ApifyKeyValueStoreMetadata.model_validate(raw_metadata) await _alias.store_mapping(storage_id=metadata.id) # Return the client for the newly created storage directly. From 38df4eb9cc56b41d1a5e2521285a1f3641eb6502 Mon Sep 17 00:00:00 2001 From: Josef Prochazka Date: Wed, 5 Nov 2025 13:20:59 +0100 Subject: [PATCH 07/10] Use forgotten RQ specific code --- .../storage_clients/_apify/_request_queue_client.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/apify/storage_clients/_apify/_request_queue_client.py b/src/apify/storage_clients/_apify/_request_queue_client.py index 9bc93cf4..3f40eb86 100644 --- a/src/apify/storage_clients/_apify/_request_queue_client.py +++ b/src/apify/storage_clients/_apify/_request_queue_client.py @@ -6,6 +6,7 @@ from typing_extensions import override from apify_client.clients import RequestQueueClientAsync +from crawlee._utils.crypto import crypto_random_object_id from crawlee.storage_clients._base import RequestQueueClient from crawlee.storages import RequestQueue @@ -263,4 +264,10 @@ def _get_metadata(raw_metadata: dict | None) -> ApifyRequestQueueMetadata: return ApifyRequestQueueMetadata.model_validate(raw_metadata) def _get_resource_client(self, id: str) -> RequestQueueClientAsync: - return self._api_client.request_queue(request_queue_id=id) + # Use suitable client_key to make `hadMultipleClients` response of Apify API useful. + # It should persist across migrated or resurrected Actor runs on the Apify platform. + _api_max_client_key_length = 32 + client_key = (self._configuration.actor_run_id or crypto_random_object_id(length=_api_max_client_key_length))[ + :_api_max_client_key_length + ] + return self._api_client.request_queue(request_queue_id=id, client_key=client_key) From 7c40de03a005489f3c2a9fadd9130269ed81cb98 Mon Sep 17 00:00:00 2001 From: Vlada Dusek Date: Tue, 11 Nov 2025 15:38:52 +0100 Subject: [PATCH 08/10] Functional approach Without additional inheritance in individual storage clients. --- .../storage_clients/_apify/_alias_resolver.py | 271 +++++++++++++++ .../_apify/_api_client_factory.py | 183 ++++++++++ .../storage_clients/_apify/_dataset_client.py | 37 +-- .../_apify/_key_value_store_client.py | 52 +-- .../_apify/_request_queue_client.py | 53 +-- .../_apify/_request_queue_single_client.py | 2 +- src/apify/storage_clients/_apify/_utils.py | 312 +----------------- tests/integration/apify_api/conftest.py | 2 +- tests/unit/conftest.py | 2 +- 9 files changed, 505 insertions(+), 409 deletions(-) create mode 100644 src/apify/storage_clients/_apify/_alias_resolver.py create mode 100644 src/apify/storage_clients/_apify/_api_client_factory.py diff --git a/src/apify/storage_clients/_apify/_alias_resolver.py b/src/apify/storage_clients/_apify/_alias_resolver.py new file mode 100644 index 00000000..aaba5ae5 --- /dev/null +++ b/src/apify/storage_clients/_apify/_alias_resolver.py @@ -0,0 +1,271 @@ +from __future__ import annotations + +import logging +from asyncio import Lock +from logging import getLogger +from typing import TYPE_CHECKING, ClassVar, overload + +from apify_client import ApifyClientAsync + +from ._utils import hash_api_base_url_and_token +from apify._configuration import Configuration + +if TYPE_CHECKING: + from collections.abc import Callable + from types import TracebackType + + from apify_client.clients import ( + DatasetClientAsync, + DatasetCollectionClientAsync, + KeyValueStoreClientAsync, + KeyValueStoreCollectionClientAsync, + RequestQueueClientAsync, + RequestQueueCollectionClientAsync, + ) + from crawlee.storages import Dataset, KeyValueStore, RequestQueue + + +logger = getLogger(__name__) + + +@overload +async def open_by_alias( + *, + alias: str, + storage_class: type[KeyValueStore], + collection_client: KeyValueStoreCollectionClientAsync, + get_resource_client_by_id: Callable[[str], KeyValueStoreClientAsync], + configuration: Configuration, +) -> KeyValueStoreClientAsync: ... + + +@overload +async def open_by_alias( + *, + alias: str, + storage_class: type[RequestQueue], + collection_client: RequestQueueCollectionClientAsync, + get_resource_client_by_id: Callable[[str], RequestQueueClientAsync], + configuration: Configuration, +) -> RequestQueueClientAsync: ... + + +@overload +async def open_by_alias( + *, + alias: str, + storage_class: type[Dataset], + collection_client: DatasetCollectionClientAsync, + get_resource_client_by_id: Callable[[str], DatasetClientAsync], + configuration: Configuration, +) -> DatasetClientAsync: ... + + +async def open_by_alias( + *, + alias: str, + storage_class: type[Dataset | KeyValueStore | RequestQueue], + collection_client: ( + KeyValueStoreCollectionClientAsync | RequestQueueCollectionClientAsync | DatasetCollectionClientAsync + ), + get_resource_client_by_id: Callable[[str], KeyValueStoreClientAsync | RequestQueueClientAsync | DatasetClientAsync], + configuration: Configuration, +) -> KeyValueStoreClientAsync | RequestQueueClientAsync | DatasetClientAsync: + """Open storage by alias, creating it if necessary. + + This function resolves storage aliases to their IDs, creating new unnamed storage if needed. + The alias mapping is stored in the default key-value store for persistence across Actor runs. + + Args: + alias: The alias name for the storage (e.g., '__default__', 'my-storage'). + storage_class: The storage class type (KeyValueStore, RequestQueue, or Dataset). + collection_client: The Apify API collection client for the storage type. + get_resource_client_by_id: A callable that takes a storage ID and returns the resource client. + configuration: Configuration object containing API credentials and settings. + + Returns: + The storage client for the opened or created storage. + + Raises: + ValueError: If storage ID cannot be determined from API response. + TypeError: If API response format is unexpected. + """ + async with AliasResolver( + storage_type=storage_class, + alias=alias, + configuration=configuration, + ) as alias_resolver: + storage_id = await alias_resolver.resolve_id() + + if storage_id: + # Check if storage with this ID exists + resource_client = get_resource_client_by_id(storage_id) + raw_metadata = await resource_client.get() + if raw_metadata: + return resource_client + + # Create new unnamed storage and store alias mapping + raw_metadata = await collection_client.get_or_create() + + # Determine metadata class to parse the ID + if isinstance(raw_metadata, dict): + storage_id = raw_metadata.get('id') + if not storage_id: + raise ValueError('Failed to get storage ID from API response') + else: + raise TypeError('Unexpected API response format') + + await alias_resolver.store_mapping(storage_id=storage_id) + return get_resource_client_by_id(storage_id) + + +class AliasResolver: + """Class for handling aliases. + + The purpose of this is class is to ensure that alias storages are created with correct id. This is achieved by using + default kvs as a storage for global mapping of aliases to storage ids. Same mapping is also kept in memory to avoid + unnecessary calls to API and also have limited support of alias storages when not running on Apify platform. When on + Apify platform, the storages created with alias are accessible by the same alias even after migration or reboot. + """ + + _alias_map: ClassVar[dict[str, str]] = {} + """Map containing pre-existing alias storages and their ids. Global for all instances.""" + + _alias_init_lock: Lock | None = None + """Lock for creating alias storages. Only one alias storage can be created at the time. Global for all instances.""" + + _ALIAS_STORAGE_KEY_SEPARATOR = ',' + _ALIAS_MAPPING_KEY = '__STORAGE_ALIASES_MAPPING' + + def __init__( + self, + storage_type: type[Dataset | KeyValueStore | RequestQueue], + alias: str, + configuration: Configuration, + ) -> None: + self._storage_type = storage_type + self._alias = alias + self._configuration = configuration + self._additional_cache_key = hash_api_base_url_and_token(configuration) + + async def __aenter__(self) -> AliasResolver: + """Context manager to prevent race condition in alias creation.""" + lock = await self._get_alias_init_lock() + await lock.acquire() + return self + + async def __aexit__( + self, + exc_type: type[BaseException] | None, + exc_value: BaseException | None, + exc_traceback: TracebackType | None, + ) -> None: + lock = await self._get_alias_init_lock() + lock.release() + + @classmethod + async def _get_alias_init_lock(cls) -> Lock: + """Get lock for controlling the creation of the alias storages. + + The lock is shared for all instances of the AliasResolver class. + It is created in async method to ensure that some event loop is already running. + """ + if cls._alias_init_lock is None: + cls._alias_init_lock = Lock() + return cls._alias_init_lock + + @classmethod + async def _get_alias_map(cls, configuration: Configuration) -> dict[str, str]: + """Get the aliases and storage ids mapping from the default kvs. + + Mapping is loaded from kvs only once and is shared for all instances of the _AliasResolver class. + + Args: + configuration: Configuration object to use for accessing the default KVS. + + Returns: + Map of aliases and storage ids. + """ + if not cls._alias_map and Configuration.get_global_configuration().is_at_home: + default_kvs_client = await cls._get_default_kvs_client(configuration) + + record = await default_kvs_client.get_record(cls._ALIAS_MAPPING_KEY) + + # get_record can return {key: ..., value: ..., content_type: ...} + if isinstance(record, dict): + if 'value' in record and isinstance(record['value'], dict): + cls._alias_map = record['value'] + else: + cls._alias_map = record + else: + cls._alias_map = dict[str, str]() + + return cls._alias_map + + async def resolve_id(self) -> str | None: + """Get id of the aliased storage. + + Returns: + Storage id if it exists, None otherwise. + """ + return (await self._get_alias_map(self._configuration)).get(self._storage_key, None) + + async def store_mapping(self, storage_id: str) -> None: + """Add alias and related storage id to the mapping in default kvs and local in-memory mapping.""" + # Update in-memory mapping + alias_map = await self._get_alias_map(self._configuration) + alias_map[self._storage_key] = storage_id + + if not Configuration.get_global_configuration().is_at_home: + logging.getLogger(__name__).debug( + '_AliasResolver storage limited retention is only supported on Apify platform. Storage is not exported.' + ) + return + + default_kvs_client = await self._get_default_kvs_client(self._configuration) + await default_kvs_client.get() + + try: + record = await default_kvs_client.get_record(self._ALIAS_MAPPING_KEY) + + # get_record can return {key: ..., value: ..., content_type: ...} + if isinstance(record, dict) and 'value' in record: + record = record['value'] + + # Update or create the record with the new alias mapping + if isinstance(record, dict): + record[self._storage_key] = storage_id + else: + record = {self._storage_key: storage_id} + + # Store the mapping back in the KVS. + await default_kvs_client.set_record(self._ALIAS_MAPPING_KEY, record) + except Exception as exc: + logger.warning(f'Error storing alias mapping for {self._alias}: {exc}') + + @property + def _storage_key(self) -> str: + """Get a unique storage key used for storing the alias in the mapping.""" + return self._ALIAS_STORAGE_KEY_SEPARATOR.join( + [ + self._storage_type.__name__, + self._alias, + self._additional_cache_key, + ] + ) + + @staticmethod + async def _get_default_kvs_client(configuration: Configuration) -> KeyValueStoreClientAsync: + """Get a client for the default key-value store.""" + apify_client_async = ApifyClientAsync( + token=configuration.token, + api_url=configuration.api_base_url, + max_retries=8, + min_delay_between_retries_millis=500, + timeout_secs=360, + ) + + if not configuration.default_key_value_store_id: + raise ValueError("'Configuration.default_key_value_store_id' must be set.") + + return apify_client_async.key_value_store(key_value_store_id=configuration.default_key_value_store_id) diff --git a/src/apify/storage_clients/_apify/_api_client_factory.py b/src/apify/storage_clients/_apify/_api_client_factory.py new file mode 100644 index 00000000..6ec00e0a --- /dev/null +++ b/src/apify/storage_clients/_apify/_api_client_factory.py @@ -0,0 +1,183 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Literal, overload + +from apify_client import ApifyClientAsync +from crawlee._utils.crypto import crypto_random_object_id +from crawlee.storages import Dataset, KeyValueStore, RequestQueue + +from apify.storage_clients._apify._alias_resolver import open_by_alias + +if TYPE_CHECKING: + from apify_client.clients import DatasetClientAsync, KeyValueStoreClientAsync, RequestQueueClientAsync + + from apify._configuration import Configuration + + +@overload +async def create_api_client( + *, + storage_type: Literal['key_value_store'], + configuration: Configuration, + id: str | None = None, + name: str | None = None, + alias: str | None = None, +) -> KeyValueStoreClientAsync: ... + + +@overload +async def create_api_client( + *, + storage_type: Literal['request_queue'], + configuration: Configuration, + id: str | None = None, + name: str | None = None, + alias: str | None = None, +) -> RequestQueueClientAsync: ... + + +@overload +async def create_api_client( + *, + storage_type: Literal['dataset'], + configuration: Configuration, + id: str | None = None, + name: str | None = None, + alias: str | None = None, +) -> DatasetClientAsync: ... + + +async def create_api_client( + *, + storage_type: Literal['key_value_store', 'request_queue', 'dataset'], + configuration: Configuration, + id: str | None = None, + name: str | None = None, + alias: str | None = None, +) -> KeyValueStoreClientAsync | RequestQueueClientAsync | DatasetClientAsync: + """Get an Apify storage API client. + + This function handles the creation and initialization of Apify storage clients (KVS, RQ, or Dataset). + It manages authentication, storage lookup/creation, and client instantiation. + + Args: + storage_type: Type of storage to open ('key_value_store', 'request_queue', or 'dataset'). + configuration: Configuration object containing API credentials and settings. + id: Storage ID to open. Mutually exclusive with name and alias. + name: Storage name (global scope, persists across runs). Mutually exclusive with id and alias. + alias: Storage alias (run scope, creates unnamed storage). Mutually exclusive with id and name. + + Returns: + The storage client for the opened or created storage. + + Raises: + ValueError: If configuration is invalid, multiple identifiers are provided, or storage cannot be found. + """ + if sum(1 for param in [id, name, alias] if param is not None) > 1: + raise ValueError('Only one of "id", "name", or "alias" can be specified, not multiple.') + + # Validate configuration + if not configuration.token: + raise ValueError(f'Apify storage client requires a valid token in Configuration (token={configuration.token}).') + + if not configuration.api_base_url: + raise ValueError( + f'Apify storage client requires a valid API URL in Configuration (api_url={configuration.api_base_url}).' + ) + + if not configuration.api_public_base_url: + raise ValueError( + 'Apify storage client requires a valid API public base URL in Configuration ' + f'(api_public_base_url={configuration.api_public_base_url}).' + ) + + api_client = ApifyClientAsync( + token=configuration.token, + api_url=configuration.api_base_url, + api_public_url=configuration.api_public_base_url, + max_retries=8, + min_delay_between_retries_millis=500, + timeout_secs=360, + ) + + # Get storage-specific configuration + if storage_type == 'key_value_store': + collection_client = api_client.key_value_stores() + default_id = configuration.default_key_value_store_id + storage_class = KeyValueStore + + def get_resource_client(storage_id: str) -> KeyValueStoreClientAsync: + return api_client.key_value_store(key_value_store_id=storage_id) + + elif storage_type == 'request_queue': + collection_client = api_client.request_queues() # type: ignore[assignment] + default_id = configuration.default_request_queue_id + storage_class = RequestQueue # type: ignore[assignment] + + def get_resource_client(storage_id: str) -> RequestQueueClientAsync: # type: ignore[misc] + # Use suitable client_key to make `hadMultipleClients` response of Apify API useful. + # It should persist across migrated or resurrected Actor runs on the Apify platform. + _api_max_client_key_length = 32 + client_key = (configuration.actor_run_id or crypto_random_object_id(length=_api_max_client_key_length))[ + :_api_max_client_key_length + ] + return api_client.request_queue(request_queue_id=storage_id, client_key=client_key) + + elif storage_type == 'dataset': + collection_client = api_client.datasets() # type: ignore[assignment] + default_id = configuration.default_dataset_id + storage_class = Dataset # type: ignore[assignment] + + def get_resource_client(storage_id: str) -> DatasetClientAsync: # type: ignore[misc] + return api_client.dataset(dataset_id=storage_id) + + else: + raise ValueError(f'Unknown storage type: {storage_type}') + + # Handle different opening scenarios + match (alias, name, id, default_id): + case (None, None, None, None): + # Normalize unnamed default storage to unnamed storage aliased as `__default__`. + # Used only when running locally. + return await open_by_alias( + alias='__default__', + storage_class=storage_class, + collection_client=collection_client, + get_resource_client_by_id=get_resource_client, + configuration=configuration, + ) + + case (str(), None, None, _): + # Open by alias + return await open_by_alias( + alias=alias, + storage_class=storage_class, + collection_client=collection_client, + get_resource_client_by_id=get_resource_client, + configuration=configuration, + ) + + case (None, None, None, str()): + # Open default storage + resource_client = get_resource_client(default_id) + raw_metadata = await resource_client.get() + if not raw_metadata: + # Default storage does not exist. Create a new one. + raw_metadata = await collection_client.get_or_create() + resource_client = get_resource_client(raw_metadata['id']) + return resource_client + + case (None, str(), None, _): + # Open by name + raw_metadata = await collection_client.get_or_create(name=name) + return get_resource_client(raw_metadata['id']) + + case (None, None, str(), _): + # Open by ID + resource_client = get_resource_client(id) + raw_metadata = await resource_client.get() + if raw_metadata is None: + raise ValueError(f'Opening {storage_class} with id={id} failed.') + return resource_client + + raise RuntimeError('Unreachable code') diff --git a/src/apify/storage_clients/_apify/_dataset_client.py b/src/apify/storage_clients/_apify/_dataset_client.py index e598280d..60d24056 100644 --- a/src/apify/storage_clients/_apify/_dataset_client.py +++ b/src/apify/storage_clients/_apify/_dataset_client.py @@ -7,19 +7,17 @@ from typing_extensions import override -from apify_client.clients import DatasetClientAsync from crawlee._utils.byte_size import ByteSize from crawlee._utils.file import json_dumps from crawlee.storage_clients._base import DatasetClient from crawlee.storage_clients.models import DatasetItemsListPage, DatasetMetadata -from crawlee.storages import Dataset -from ._utils import ApiClientFactory +from ._api_client_factory import create_api_client if TYPE_CHECKING: from collections.abc import AsyncIterator - from apify_client.clients import DatasetCollectionClientAsync + from apify_client.clients import DatasetClientAsync from crawlee._types import JsonSerializable from apify import Configuration @@ -102,9 +100,13 @@ async def open( `id`, `name`, or `alias` is provided, or if none are provided and no default storage ID is available in the configuration. """ - api_client, _ = await DatasetApiClientFactory( - configuration=configuration, alias=alias, name=name, id=id - ).get_client_with_metadata() + api_client = await create_api_client( + storage_type='dataset', + configuration=configuration, + alias=alias, + name=name, + id=id, + ) return cls( api_client=api_client, @@ -256,24 +258,3 @@ async def _chunk_by_size(self, items: AsyncIterator[str]) -> AsyncIterator[str]: last_chunk_size = payload_size + ByteSize(2) # Add 2 bytes for [] wrapper. yield f'[{",".join(current_chunk)}]' - - -class DatasetApiClientFactory(ApiClientFactory[DatasetClientAsync, DatasetMetadata]): - @property - def _collection_client(self) -> DatasetCollectionClientAsync: - return self._api_client.datasets() - - @property - def _default_id(self) -> str | None: - return self._configuration.default_dataset_id - - @property - def _storage_type(self) -> type[Dataset]: - return Dataset - - @staticmethod - def _get_metadata(raw_metadata: dict | None) -> DatasetMetadata: - return DatasetMetadata.model_validate(raw_metadata) - - def _get_resource_client(self, id: str) -> DatasetClientAsync: - return self._api_client.dataset(dataset_id=id) diff --git a/src/apify/storage_clients/_apify/_key_value_store_client.py b/src/apify/storage_clients/_apify/_key_value_store_client.py index 8c481430..54c8906a 100644 --- a/src/apify/storage_clients/_apify/_key_value_store_client.py +++ b/src/apify/storage_clients/_apify/_key_value_store_client.py @@ -7,22 +7,16 @@ from typing_extensions import override -from apify_client.clients import ( - KeyValueStoreClientAsync, -) from crawlee.storage_clients._base import KeyValueStoreClient from crawlee.storage_clients.models import KeyValueStoreRecord, KeyValueStoreRecordMetadata -from crawlee.storages import KeyValueStore +from ._api_client_factory import create_api_client from ._models import ApifyKeyValueStoreMetadata, KeyValueStoreListKeysPage -from ._utils import ApiClientFactory if TYPE_CHECKING: from collections.abc import AsyncIterator - from apify_client.clients import ( - KeyValueStoreCollectionClientAsync, - ) + from apify_client.clients import KeyValueStoreClientAsync from apify import Configuration @@ -95,9 +89,13 @@ async def open( `id`, `name`, or `alias` is provided, or if none are provided and no default storage ID is available in the configuration. """ - api_client, _ = await KvsApiClientFactory( - configuration=configuration, alias=alias, name=name, id=id - ).get_client_with_metadata() + api_client = await create_api_client( + storage_type='key_value_store', + configuration=configuration, + alias=alias, + name=name, + id=id, + ) return cls( api_client=api_client, api_public_base_url='', # Remove in version 4.0, https://github.com/apify/apify-sdk-python/issues/635 @@ -117,12 +115,12 @@ async def drop(self) -> None: await self._api_client.delete() @override - async def get_value(self, key: str) -> KeyValueStoreRecord | None: + async def get_value(self, *, key: str) -> KeyValueStoreRecord | None: response = await self._api_client.get_record(key) return KeyValueStoreRecord.model_validate(response) if response else None @override - async def set_value(self, key: str, value: Any, content_type: str | None = None) -> None: + async def set_value(self, *, key: str, value: Any, content_type: str | None = None) -> None: async with self._lock: await self._api_client.set_record( key=key, @@ -131,7 +129,7 @@ async def set_value(self, key: str, value: Any, content_type: str | None = None) ) @override - async def delete_value(self, key: str) -> None: + async def delete_value(self, *, key: str) -> None: async with self._lock: await self._api_client.delete_record(key=key) @@ -169,10 +167,11 @@ async def iterate_keys( exclusive_start_key = list_key_page.next_exclusive_start_key @override - async def record_exists(self, key: str) -> bool: + async def record_exists(self, *, key: str) -> bool: return await self._api_client.record_exists(key=key) - async def get_public_url(self, key: str) -> str: + @override + async def get_public_url(self, *, key: str) -> str: """Get a URL for the given key that may be used to publicly access the value in the remote key-value store. Args: @@ -182,24 +181,3 @@ async def get_public_url(self, key: str) -> str: A public URL that can be used to access the value of the given key in the KVS. """ return await self._api_client.get_record_public_url(key=key) - - -class KvsApiClientFactory(ApiClientFactory[KeyValueStoreClientAsync, ApifyKeyValueStoreMetadata]): - @property - def _collection_client(self) -> KeyValueStoreCollectionClientAsync: - return self._api_client.key_value_stores() - - @property - def _default_id(self) -> str | None: - return self._configuration.default_key_value_store_id - - @property - def _storage_type(self) -> type[KeyValueStore]: - return KeyValueStore - - @staticmethod - def _get_metadata(raw_metadata: dict | None) -> ApifyKeyValueStoreMetadata: - return ApifyKeyValueStoreMetadata.model_validate(raw_metadata) - - def _get_resource_client(self, id: str) -> KeyValueStoreClientAsync: - return self._api_client.key_value_store(key_value_store_id=id) diff --git a/src/apify/storage_clients/_apify/_request_queue_client.py b/src/apify/storage_clients/_apify/_request_queue_client.py index 3f40eb86..86888626 100644 --- a/src/apify/storage_clients/_apify/_request_queue_client.py +++ b/src/apify/storage_clients/_apify/_request_queue_client.py @@ -5,20 +5,17 @@ from typing_extensions import override -from apify_client.clients import RequestQueueClientAsync -from crawlee._utils.crypto import crypto_random_object_id from crawlee.storage_clients._base import RequestQueueClient -from crawlee.storages import RequestQueue +from ._api_client_factory import create_api_client from ._models import ApifyRequestQueueMetadata, RequestQueueStats from ._request_queue_shared_client import ApifyRequestQueueSharedClient from ._request_queue_single_client import ApifyRequestQueueSingleClient -from ._utils import ApiClientFactory if TYPE_CHECKING: from collections.abc import Sequence - from apify_client.clients import RequestQueueCollectionClientAsync + from apify_client.clients import RequestQueueClientAsync from crawlee import Request from crawlee.storage_clients.models import AddRequestsResponse, ProcessedRequest, RequestQueueMetadata @@ -225,11 +222,22 @@ async def open( `id`, `name`, or `alias` is provided, or if none are provided and no default storage ID is available in the configuration. """ - _api_client, metadata = await RqApiClientFactory( - configuration=configuration, alias=alias, name=name, id=id - ).get_client_with_metadata() + api_client = await create_api_client( + storage_type='request_queue', + configuration=configuration, + alias=alias, + name=name, + id=id, + ) + + # Fetch metadata separately + raw_metadata = await api_client.get() + if raw_metadata is None: + raise ValueError('Failed to retrieve request queue metadata') + metadata = ApifyRequestQueueMetadata.model_validate(raw_metadata) + return cls( - api_client=_api_client, + api_client=api_client, metadata=metadata, access=access, ) @@ -244,30 +252,3 @@ async def purge(self) -> None: @override async def drop(self) -> None: await self._api_client.delete() - - -class RqApiClientFactory(ApiClientFactory[RequestQueueClientAsync, ApifyRequestQueueMetadata]): - @property - def _collection_client(self) -> RequestQueueCollectionClientAsync: - return self._api_client.request_queues() - - @property - def _default_id(self) -> str | None: - return self._configuration.default_request_queue_id - - @property - def _storage_type(self) -> type[RequestQueue]: - return RequestQueue - - @staticmethod - def _get_metadata(raw_metadata: dict | None) -> ApifyRequestQueueMetadata: - return ApifyRequestQueueMetadata.model_validate(raw_metadata) - - def _get_resource_client(self, id: str) -> RequestQueueClientAsync: - # Use suitable client_key to make `hadMultipleClients` response of Apify API useful. - # It should persist across migrated or resurrected Actor runs on the Apify platform. - _api_max_client_key_length = 32 - client_key = (self._configuration.actor_run_id or crypto_random_object_id(length=_api_max_client_key_length))[ - :_api_max_client_key_length - ] - return self._api_client.request_queue(request_queue_id=id, client_key=client_key) diff --git a/src/apify/storage_clients/_apify/_request_queue_single_client.py b/src/apify/storage_clients/_apify/_request_queue_single_client.py index 61c47acf..04acac1b 100644 --- a/src/apify/storage_clients/_apify/_request_queue_single_client.py +++ b/src/apify/storage_clients/_apify/_request_queue_single_client.py @@ -9,8 +9,8 @@ from crawlee.storage_clients.models import AddRequestsResponse, ProcessedRequest, RequestQueueMetadata +from ._utils import unique_key_to_request_id from apify import Request -from apify.storage_clients._apify._utils import unique_key_to_request_id if TYPE_CHECKING: from collections.abc import Sequence diff --git a/src/apify/storage_clients/_apify/_utils.py b/src/apify/storage_clients/_apify/_utils.py index 286b669a..5d8174e8 100644 --- a/src/apify/storage_clients/_apify/_utils.py +++ b/src/apify/storage_clients/_apify/_utils.py @@ -1,192 +1,14 @@ from __future__ import annotations -import logging import re -from abc import ABC, abstractmethod -from asyncio import Lock from base64 import b64encode from hashlib import sha256 -from logging import getLogger -from typing import TYPE_CHECKING, ClassVar, Generic, TypeVar +from typing import TYPE_CHECKING -from apify_client import ApifyClientAsync -from apify_client.clients import ( - DatasetClientAsync, - DatasetCollectionClientAsync, - KeyValueStoreClientAsync, - KeyValueStoreCollectionClientAsync, - RequestQueueClientAsync, - RequestQueueCollectionClientAsync, -) from crawlee._utils.crypto import compute_short_hash -from crawlee.storage_clients.models import ( - DatasetMetadata, - KeyValueStoreMetadata, - RequestQueueMetadata, -) - -from apify._configuration import Configuration -from apify.storage_clients._apify._models import ApifyKeyValueStoreMetadata if TYPE_CHECKING: - from types import TracebackType - - from crawlee.storages._base import Storage - - -logger = getLogger(__name__) - -ResourceCollectionClient = ( - KeyValueStoreCollectionClientAsync | RequestQueueCollectionClientAsync | DatasetCollectionClientAsync -) -TResourceClient = TypeVar( - 'TResourceClient', bound=KeyValueStoreClientAsync | RequestQueueClientAsync | DatasetClientAsync -) -TStorageMetadata = TypeVar('TStorageMetadata', bound=KeyValueStoreMetadata | RequestQueueMetadata | DatasetMetadata) - - -class AliasResolver: - """Class for handling aliases. - - The purpose of this is class is to ensure that alias storages are created with correct id. This is achieved by using - default kvs as a storage for global mapping of aliases to storage ids. Same mapping is also kept in memory to avoid - unnecessary calls to API and also have limited support of alias storages when not running on Apify platform. When on - Apify platform, the storages created with alias are accessible by the same alias even after migration or reboot. - """ - - _alias_map: ClassVar[dict[str, str]] = {} - """Map containing pre-existing alias storages and their ids. Global for all instances.""" - _alias_init_lock: Lock | None = None - """Lock for creating alias storages. Only one alias storage can be created at the time. Global for all instances.""" - - _ALIAS_STORAGE_KEY_SEPARATOR = ',' - _ALIAS_MAPPING_KEY = '__STORAGE_ALIASES_MAPPING' - - def __init__(self, storage_type: type[Storage], alias: str, configuration: Configuration) -> None: - self._storage_type = storage_type - self._alias = alias - self._additional_cache_key = hash_api_base_url_and_token(configuration) - - async def __aenter__(self) -> AliasResolver: - """Context manager to prevent race condition in alias creation.""" - lock = await self._get_alias_init_lock() - await lock.acquire() - return self - - async def __aexit__( - self, exc_type: type[BaseException] | None, exc_value: BaseException | None, exc_traceback: TracebackType | None - ) -> None: - lock = await self._get_alias_init_lock() - lock.release() - - @classmethod - async def _get_alias_init_lock(cls) -> Lock: - """Get lock for controlling the creation of the alias storages. - - The lock is shared for all instances of the AliasResolver class. - It is created in async method to ensure that some event loop is already running. - """ - if cls._alias_init_lock is None: - cls._alias_init_lock = Lock() - return cls._alias_init_lock - - @classmethod - async def _get_alias_map(cls) -> dict[str, str]: - """Get the aliases and storage ids mapping from the default kvs. - - Mapping is loaded from kvs only once and is shared for all instances of the AliasResolver class. - - Returns: - Map of aliases and storage ids. - """ - if not cls._alias_map and Configuration.get_global_configuration().is_at_home: - default_kvs_client = await _get_default_kvs_client() - - record = await default_kvs_client.get_record(cls._ALIAS_MAPPING_KEY) - - # get_record can return {key: ..., value: ..., content_type: ...} - if isinstance(record, dict): - if 'value' in record and isinstance(record['value'], dict): - cls._alias_map = record['value'] - else: - cls._alias_map = record - else: - cls._alias_map = dict[str, str]() - - return cls._alias_map - - async def resolve_id(self) -> str | None: - """Get id of the aliased storage. - - Returns: - Storage id if it exists, None otherwise. - """ - return (await self._get_alias_map()).get(self._storage_key, None) - - async def store_mapping(self, storage_id: str) -> None: - """Add alias and related storage id to the mapping in default kvs and local in-memory mapping.""" - # Update in-memory mapping - (await self._get_alias_map())[self._storage_key] = storage_id - if not Configuration.get_global_configuration().is_at_home: - logging.getLogger(__name__).debug( - 'AliasResolver storage limited retention is only supported on Apify platform. Storage is not exported.' - ) - return - - default_kvs_client = await _get_default_kvs_client() - await default_kvs_client.get() - - try: - record = await default_kvs_client.get_record(self._ALIAS_MAPPING_KEY) - - # get_record can return {key: ..., value: ..., content_type: ...} - if isinstance(record, dict) and 'value' in record: - record = record['value'] - - # Update or create the record with the new alias mapping - if isinstance(record, dict): - record[self._storage_key] = storage_id - else: - record = {self._storage_key: storage_id} - - # Store the mapping back in the KVS. - await default_kvs_client.set_record(self._ALIAS_MAPPING_KEY, record) - except Exception as exc: - logger.warning(f'Error storing alias mapping for {self._alias}: {exc}') - - @property - def _storage_key(self) -> str: - """Get a unique storage key used for storing the alias in the mapping.""" - return self._ALIAS_STORAGE_KEY_SEPARATOR.join( - [ - self._storage_type.__name__, - self._alias, - self._additional_cache_key, - ] - ) - - -async def _get_default_kvs_client() -> KeyValueStoreClientAsync: - """Get a client for the default key-value store.""" - configuration = Configuration.get_global_configuration() - - apify_client_async = ApifyClientAsync( - token=configuration.token, - api_url=configuration.api_base_url, - max_retries=8, - min_delay_between_retries_millis=500, - timeout_secs=360, - ) - if not configuration.default_key_value_store_id: - raise ValueError("'Configuration.default_key_value_store_id' must be set.") - return apify_client_async.key_value_store(key_value_store_id=configuration.default_key_value_store_id) - - -def hash_api_base_url_and_token(configuration: Configuration) -> str: - """Hash configuration.api_public_base_url and configuration.token in deterministic way.""" - if configuration.api_public_base_url is None or configuration.token is None: - raise ValueError("'Configuration.api_public_base_url' and 'Configuration.token' must be set.") - return compute_short_hash(f'{configuration.api_public_base_url}{configuration.token}'.encode()) + from apify import Configuration def unique_key_to_request_id(unique_key: str, *, request_id_length: int = 15) -> str: @@ -212,128 +34,8 @@ def unique_key_to_request_id(unique_key: str, *, request_id_length: int = 15) -> return url_safe_key[:request_id_length] -def create_apify_client(configuration: Configuration) -> ApifyClientAsync: - """Create and return an ApifyClientAsync instance using the provided configuration.""" - if not configuration.token: - raise ValueError(f'Apify storage client requires a valid token in Configuration (token={configuration.token}).') - - api_url = configuration.api_base_url - if not api_url: - raise ValueError(f'Apify storage client requires a valid API URL in Configuration (api_url={api_url}).') - - api_public_base_url = configuration.api_public_base_url - if not api_public_base_url: - raise ValueError( - 'Apify storage client requires a valid API public base URL in Configuration ' - f'(api_public_base_url={api_public_base_url}).' - ) - - # Create Apify client with the provided token and API URL. - return ApifyClientAsync( - token=configuration.token, - api_url=api_url, - api_public_url=api_public_base_url, - max_retries=8, - min_delay_between_retries_millis=500, - timeout_secs=360, - ) - - -class ApiClientFactory(ABC, Generic[TResourceClient, TStorageMetadata]): - def __init__(self, configuration: Configuration, alias: str | None, name: str | None, id: str | None) -> None: - if sum(1 for param in [id, name, alias] if param is not None) > 1: - raise ValueError('Only one of "id", "name", or "alias" can be specified, not multiple.') - - self._alias = alias - self._name = name - self._id = id - self._configuration = configuration - self._api_client = create_apify_client(configuration) - - async def get_client_with_metadata(self) -> tuple[TResourceClient, TStorageMetadata]: - match (self._alias, self._name, self._id, self._default_id): - case (None, None, None, None): - # Normalize unnamed default storage in cases where not defined in `self._default_id` to - # unnamed storage aliased as `__default__`. Used only when running locally. - return await self._open_by_alias('__default__') - - case (str(), None, None, _): - return await self._open_by_alias(self._alias) - - case (None, None, None, str()): - resource_client = self._get_resource_client(id=self._default_id) - raw_metadata = await resource_client.get() - metadata = self._get_metadata(raw_metadata) - if not raw_metadata: - # Do we want this??? Backwards compatibility, so probably yes. - # Default storage does not exist. Create a new one with new id. - raw_metadata = await self._collection_client.get_or_create() - metadata = self._get_metadata(raw_metadata) - resource_client = self._get_resource_client(id=metadata.id) - return resource_client, metadata - - case (None, str(), None, _): - metadata = self._get_metadata(await self._collection_client.get_or_create(name=self._name)) - # Freshly fetched named storage. No need to fetch it again. - return self._get_resource_client(id=metadata.id), metadata - - case (None, None, str(), _): - resource_client = self._get_resource_client(id=self._id) - # Fetch its metadata. - raw_metadata = await resource_client.get() - # If metadata is None, it means the storage does not exist. - if raw_metadata is None: - raise ValueError(f'Opening {self._storage_type} with id={self._id} failed.') - return resource_client, self._get_metadata(raw_metadata) - - raise RuntimeError('Unreachable code') - - @property - @abstractmethod - def _collection_client(self) -> ResourceCollectionClient: - """Get a collection API client.""" - - @property - @abstractmethod - def _default_id(self) -> str | None: - """Get a metadata model class.""" - - @property - @abstractmethod - def _storage_type(self) -> type[Storage]: - """Get a metadata model class.""" - - @staticmethod - @abstractmethod - def _get_metadata(raw_metadata: dict | None) -> TStorageMetadata: - """Get a metadata model class.""" - - async def _open_by_alias(self, alias: str) -> tuple[TResourceClient, TStorageMetadata]: - # Check if there is pre-existing alias mapping in the default KVS. - async with AliasResolver( - storage_type=self._storage_type, alias=alias, configuration=self._configuration - ) as _alias: - id = await _alias.resolve_id() - - if id: - # There was id, storage has to exist, fetch metadata to confirm it. - resource_client = self._get_resource_client(id=id) - raw_metadata = await resource_client.get() - if raw_metadata: - return resource_client, self._get_metadata(raw_metadata) - # If we do not raise here, we will behave same as for default storage. We create it even though it - # should exist already. Consistency or throw an error??? - - # There was no pre-existing alias in the mapping or the id did not point to existing storage. - # Create a new unnamed storage and store the alias mapping. - raw_metadata = await self._collection_client.get_or_create() - metadata = ApifyKeyValueStoreMetadata.model_validate(raw_metadata) - await _alias.store_mapping(storage_id=metadata.id) - - # Return the client for the newly created storage directly. - # It was just created, no need to refetch it. - return self._get_resource_client(id=metadata.id), self._get_metadata(raw_metadata) - - @abstractmethod - def _get_resource_client(self, id: str) -> TResourceClient: - """Get a resource API client.""" +def hash_api_base_url_and_token(configuration: Configuration) -> str: + """Hash configuration.api_public_base_url and configuration.token in deterministic way.""" + if configuration.api_public_base_url is None or configuration.token is None: + raise ValueError("'Configuration.api_public_base_url' and 'Configuration.token' must be set.") + return compute_short_hash(f'{configuration.api_public_base_url}{configuration.token}'.encode()) diff --git a/tests/integration/apify_api/conftest.py b/tests/integration/apify_api/conftest.py index ac085da3..98c2141c 100644 --- a/tests/integration/apify_api/conftest.py +++ b/tests/integration/apify_api/conftest.py @@ -9,7 +9,7 @@ from crawlee import service_locator import apify._actor -from apify.storage_clients._apify._utils import AliasResolver +from apify.storage_clients._apify._alias_resolver import AliasResolver if TYPE_CHECKING: from collections.abc import Callable diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index bd041b50..2597b3d6 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -17,7 +17,7 @@ import apify._actor import apify.log -from apify.storage_clients._apify._utils import AliasResolver +from apify.storage_clients._apify._alias_resolver import AliasResolver if TYPE_CHECKING: from collections.abc import Callable, Iterator From a04232fcb625b9969beeff8b3e0ba64d93c7ba55 Mon Sep 17 00:00:00 2001 From: Vlada Dusek Date: Wed, 12 Nov 2025 10:53:30 +0100 Subject: [PATCH 09/10] Address feedback and final polishment --- ..._alias_resolver.py => _alias_resolving.py} | 49 +++---- ...ent_factory.py => _api_client_creation.py} | 121 +++++++++--------- .../storage_clients/_apify/_dataset_client.py | 6 +- .../_apify/_key_value_store_client.py | 6 +- .../_apify/_request_queue_client.py | 6 +- tests/integration/apify_api/conftest.py | 2 +- tests/unit/conftest.py | 2 +- 7 files changed, 96 insertions(+), 96 deletions(-) rename src/apify/storage_clients/_apify/{_alias_resolver.py => _alias_resolving.py} (93%) rename src/apify/storage_clients/_apify/{_api_client_factory.py => _api_client_creation.py} (73%) diff --git a/src/apify/storage_clients/_apify/_alias_resolver.py b/src/apify/storage_clients/_apify/_alias_resolving.py similarity index 93% rename from src/apify/storage_clients/_apify/_alias_resolver.py rename to src/apify/storage_clients/_apify/_alias_resolving.py index aaba5ae5..fad2329b 100644 --- a/src/apify/storage_clients/_apify/_alias_resolver.py +++ b/src/apify/storage_clients/_apify/_alias_resolving.py @@ -3,7 +3,7 @@ import logging from asyncio import Lock from logging import getLogger -from typing import TYPE_CHECKING, ClassVar, overload +from typing import TYPE_CHECKING, ClassVar, Literal, overload from apify_client import ApifyClientAsync @@ -22,8 +22,6 @@ RequestQueueClientAsync, RequestQueueCollectionClientAsync, ) - from crawlee.storages import Dataset, KeyValueStore, RequestQueue - logger = getLogger(__name__) @@ -32,39 +30,39 @@ async def open_by_alias( *, alias: str, - storage_class: type[KeyValueStore], - collection_client: KeyValueStoreCollectionClientAsync, - get_resource_client_by_id: Callable[[str], KeyValueStoreClientAsync], + storage_type: Literal['Dataset'], + collection_client: DatasetCollectionClientAsync, + get_resource_client_by_id: Callable[[str], DatasetClientAsync], configuration: Configuration, -) -> KeyValueStoreClientAsync: ... +) -> DatasetClientAsync: ... @overload async def open_by_alias( *, alias: str, - storage_class: type[RequestQueue], - collection_client: RequestQueueCollectionClientAsync, - get_resource_client_by_id: Callable[[str], RequestQueueClientAsync], + storage_type: Literal['KeyValueStore'], + collection_client: KeyValueStoreCollectionClientAsync, + get_resource_client_by_id: Callable[[str], KeyValueStoreClientAsync], configuration: Configuration, -) -> RequestQueueClientAsync: ... +) -> KeyValueStoreClientAsync: ... @overload async def open_by_alias( *, alias: str, - storage_class: type[Dataset], - collection_client: DatasetCollectionClientAsync, - get_resource_client_by_id: Callable[[str], DatasetClientAsync], + storage_type: Literal['RequestQueue'], + collection_client: RequestQueueCollectionClientAsync, + get_resource_client_by_id: Callable[[str], RequestQueueClientAsync], configuration: Configuration, -) -> DatasetClientAsync: ... +) -> RequestQueueClientAsync: ... async def open_by_alias( *, alias: str, - storage_class: type[Dataset | KeyValueStore | RequestQueue], + storage_type: Literal['Dataset', 'KeyValueStore', 'RequestQueue'], collection_client: ( KeyValueStoreCollectionClientAsync | RequestQueueCollectionClientAsync | DatasetCollectionClientAsync ), @@ -78,7 +76,7 @@ async def open_by_alias( Args: alias: The alias name for the storage (e.g., '__default__', 'my-storage'). - storage_class: The storage class type (KeyValueStore, RequestQueue, or Dataset). + storage_type: The type of storage to open. collection_client: The Apify API collection client for the storage type. get_resource_client_by_id: A callable that takes a storage ID and returns the resource client. configuration: Configuration object containing API credentials and settings. @@ -91,7 +89,7 @@ async def open_by_alias( TypeError: If API response format is unexpected. """ async with AliasResolver( - storage_type=storage_class, + storage_type=storage_type, alias=alias, configuration=configuration, ) as alias_resolver: @@ -125,21 +123,24 @@ class AliasResolver: The purpose of this is class is to ensure that alias storages are created with correct id. This is achieved by using default kvs as a storage for global mapping of aliases to storage ids. Same mapping is also kept in memory to avoid unnecessary calls to API and also have limited support of alias storages when not running on Apify platform. When on - Apify platform, the storages created with alias are accessible by the same alias even after migration or reboot. + Apify platform, the storages created with alias are accessible by the same alias even after migration or reboot. """ + _ALIAS_MAPPING_KEY = '__STORAGE_ALIASES_MAPPING' + """Key used for storing the alias mapping in the default kvs.""" + + _ALIAS_STORAGE_KEY_SEPARATOR = ',' + """Separator used in the storage key for storing the alias mapping.""" + _alias_map: ClassVar[dict[str, str]] = {} """Map containing pre-existing alias storages and their ids. Global for all instances.""" _alias_init_lock: Lock | None = None """Lock for creating alias storages. Only one alias storage can be created at the time. Global for all instances.""" - _ALIAS_STORAGE_KEY_SEPARATOR = ',' - _ALIAS_MAPPING_KEY = '__STORAGE_ALIASES_MAPPING' - def __init__( self, - storage_type: type[Dataset | KeyValueStore | RequestQueue], + storage_type: Literal['Dataset', 'KeyValueStore', 'RequestQueue'], alias: str, configuration: Configuration, ) -> None: @@ -248,7 +249,7 @@ def _storage_key(self) -> str: """Get a unique storage key used for storing the alias in the mapping.""" return self._ALIAS_STORAGE_KEY_SEPARATOR.join( [ - self._storage_type.__name__, + self._storage_type, self._alias, self._additional_cache_key, ] diff --git a/src/apify/storage_clients/_apify/_api_client_factory.py b/src/apify/storage_clients/_apify/_api_client_creation.py similarity index 73% rename from src/apify/storage_clients/_apify/_api_client_factory.py rename to src/apify/storage_clients/_apify/_api_client_creation.py index 6ec00e0a..05869580 100644 --- a/src/apify/storage_clients/_apify/_api_client_factory.py +++ b/src/apify/storage_clients/_apify/_api_client_creation.py @@ -4,9 +4,8 @@ from apify_client import ApifyClientAsync from crawlee._utils.crypto import crypto_random_object_id -from crawlee.storages import Dataset, KeyValueStore, RequestQueue -from apify.storage_clients._apify._alias_resolver import open_by_alias +from apify.storage_clients._apify._alias_resolving import open_by_alias if TYPE_CHECKING: from apify_client.clients import DatasetClientAsync, KeyValueStoreClientAsync, RequestQueueClientAsync @@ -15,54 +14,54 @@ @overload -async def create_api_client( +async def create_storage_api_client( *, - storage_type: Literal['key_value_store'], + storage_type: Literal['Dataset'], configuration: Configuration, id: str | None = None, name: str | None = None, alias: str | None = None, -) -> KeyValueStoreClientAsync: ... +) -> DatasetClientAsync: ... @overload -async def create_api_client( +async def create_storage_api_client( *, - storage_type: Literal['request_queue'], + storage_type: Literal['KeyValueStore'], configuration: Configuration, id: str | None = None, name: str | None = None, alias: str | None = None, -) -> RequestQueueClientAsync: ... +) -> KeyValueStoreClientAsync: ... @overload -async def create_api_client( +async def create_storage_api_client( *, - storage_type: Literal['dataset'], + storage_type: Literal['RequestQueue'], configuration: Configuration, id: str | None = None, name: str | None = None, alias: str | None = None, -) -> DatasetClientAsync: ... +) -> RequestQueueClientAsync: ... -async def create_api_client( +async def create_storage_api_client( *, - storage_type: Literal['key_value_store', 'request_queue', 'dataset'], + storage_type: Literal['Dataset', 'KeyValueStore', 'RequestQueue'], configuration: Configuration, id: str | None = None, name: str | None = None, alias: str | None = None, ) -> KeyValueStoreClientAsync | RequestQueueClientAsync | DatasetClientAsync: - """Get an Apify storage API client. + """Get an Apify API client for a specific storage type. This function handles the creation and initialization of Apify storage clients (KVS, RQ, or Dataset). It manages authentication, storage lookup/creation, and client instantiation. Args: - storage_type: Type of storage to open ('key_value_store', 'request_queue', or 'dataset'). - configuration: Configuration object containing API credentials and settings. + storage_type: The type of storage to open. + configuration: Configuration object containing API credentials. id: Storage ID to open. Mutually exclusive with name and alias. name: Storage name (global scope, persists across runs). Mutually exclusive with id and alias. alias: Storage alias (run scope, creates unnamed storage). Mutually exclusive with id and name. @@ -76,43 +75,19 @@ async def create_api_client( if sum(1 for param in [id, name, alias] if param is not None) > 1: raise ValueError('Only one of "id", "name", or "alias" can be specified, not multiple.') - # Validate configuration - if not configuration.token: - raise ValueError(f'Apify storage client requires a valid token in Configuration (token={configuration.token}).') - - if not configuration.api_base_url: - raise ValueError( - f'Apify storage client requires a valid API URL in Configuration (api_url={configuration.api_base_url}).' - ) - - if not configuration.api_public_base_url: - raise ValueError( - 'Apify storage client requires a valid API public base URL in Configuration ' - f'(api_public_base_url={configuration.api_public_base_url}).' - ) - - api_client = ApifyClientAsync( - token=configuration.token, - api_url=configuration.api_base_url, - api_public_url=configuration.api_public_base_url, - max_retries=8, - min_delay_between_retries_millis=500, - timeout_secs=360, - ) + apify_client = _create_api_client(configuration) # Get storage-specific configuration - if storage_type == 'key_value_store': - collection_client = api_client.key_value_stores() + if storage_type == 'KeyValueStore': + collection_client = apify_client.key_value_stores() default_id = configuration.default_key_value_store_id - storage_class = KeyValueStore def get_resource_client(storage_id: str) -> KeyValueStoreClientAsync: - return api_client.key_value_store(key_value_store_id=storage_id) + return apify_client.key_value_store(key_value_store_id=storage_id) - elif storage_type == 'request_queue': - collection_client = api_client.request_queues() # type: ignore[assignment] + elif storage_type == 'RequestQueue': + collection_client = apify_client.request_queues() # type: ignore[assignment] default_id = configuration.default_request_queue_id - storage_class = RequestQueue # type: ignore[assignment] def get_resource_client(storage_id: str) -> RequestQueueClientAsync: # type: ignore[misc] # Use suitable client_key to make `hadMultipleClients` response of Apify API useful. @@ -121,63 +96,87 @@ def get_resource_client(storage_id: str) -> RequestQueueClientAsync: # type: ig client_key = (configuration.actor_run_id or crypto_random_object_id(length=_api_max_client_key_length))[ :_api_max_client_key_length ] - return api_client.request_queue(request_queue_id=storage_id, client_key=client_key) + return apify_client.request_queue(request_queue_id=storage_id, client_key=client_key) - elif storage_type == 'dataset': - collection_client = api_client.datasets() # type: ignore[assignment] + elif storage_type == 'Dataset': + collection_client = apify_client.datasets() # type: ignore[assignment] default_id = configuration.default_dataset_id - storage_class = Dataset # type: ignore[assignment] def get_resource_client(storage_id: str) -> DatasetClientAsync: # type: ignore[misc] - return api_client.dataset(dataset_id=storage_id) + return apify_client.dataset(dataset_id=storage_id) else: raise ValueError(f'Unknown storage type: {storage_type}') # Handle different opening scenarios match (alias, name, id, default_id): + # Normalize unnamed default storage to unnamed storage aliased as `__default__`. case (None, None, None, None): - # Normalize unnamed default storage to unnamed storage aliased as `__default__`. - # Used only when running locally. return await open_by_alias( alias='__default__', - storage_class=storage_class, + storage_type=storage_type, # type: ignore[arg-type] collection_client=collection_client, get_resource_client_by_id=get_resource_client, configuration=configuration, ) + # Open by alias. case (str(), None, None, _): - # Open by alias return await open_by_alias( alias=alias, - storage_class=storage_class, + storage_type=storage_type, # type: ignore[arg-type] collection_client=collection_client, get_resource_client_by_id=get_resource_client, configuration=configuration, ) + # Open default storage. case (None, None, None, str()): - # Open default storage resource_client = get_resource_client(default_id) raw_metadata = await resource_client.get() + # Default storage does not exist. Create a new one. if not raw_metadata: - # Default storage does not exist. Create a new one. raw_metadata = await collection_client.get_or_create() resource_client = get_resource_client(raw_metadata['id']) return resource_client + # Open by name. case (None, str(), None, _): - # Open by name raw_metadata = await collection_client.get_or_create(name=name) return get_resource_client(raw_metadata['id']) + # Open by ID. case (None, None, str(), _): - # Open by ID resource_client = get_resource_client(id) raw_metadata = await resource_client.get() if raw_metadata is None: - raise ValueError(f'Opening {storage_class} with id={id} failed.') + raise ValueError(f'Opening {storage_type} with id={id} failed.') return resource_client raise RuntimeError('Unreachable code') + + +def _create_api_client(configuration: Configuration) -> ApifyClientAsync: + """Create and validate an ApifyClientAsync from the given Configuration.""" + if not configuration.token: + raise ValueError(f'Apify storage client requires a valid token in Configuration (token={configuration.token}).') + + if not configuration.api_base_url: + raise ValueError( + f'Apify storage client requires a valid API URL in Configuration (api_url={configuration.api_base_url}).' + ) + + if not configuration.api_public_base_url: + raise ValueError( + 'Apify storage client requires a valid API public base URL in Configuration ' + f'(api_public_base_url={configuration.api_public_base_url}).' + ) + + return ApifyClientAsync( + token=configuration.token, + api_url=configuration.api_base_url, + api_public_url=configuration.api_public_base_url, + max_retries=8, + min_delay_between_retries_millis=500, + timeout_secs=360, + ) diff --git a/src/apify/storage_clients/_apify/_dataset_client.py b/src/apify/storage_clients/_apify/_dataset_client.py index 60d24056..a918bddd 100644 --- a/src/apify/storage_clients/_apify/_dataset_client.py +++ b/src/apify/storage_clients/_apify/_dataset_client.py @@ -12,7 +12,7 @@ from crawlee.storage_clients._base import DatasetClient from crawlee.storage_clients.models import DatasetItemsListPage, DatasetMetadata -from ._api_client_factory import create_api_client +from ._api_client_creation import create_storage_api_client if TYPE_CHECKING: from collections.abc import AsyncIterator @@ -100,8 +100,8 @@ async def open( `id`, `name`, or `alias` is provided, or if none are provided and no default storage ID is available in the configuration. """ - api_client = await create_api_client( - storage_type='dataset', + api_client = await create_storage_api_client( + storage_type='Dataset', configuration=configuration, alias=alias, name=name, diff --git a/src/apify/storage_clients/_apify/_key_value_store_client.py b/src/apify/storage_clients/_apify/_key_value_store_client.py index 54c8906a..b422b464 100644 --- a/src/apify/storage_clients/_apify/_key_value_store_client.py +++ b/src/apify/storage_clients/_apify/_key_value_store_client.py @@ -10,7 +10,7 @@ from crawlee.storage_clients._base import KeyValueStoreClient from crawlee.storage_clients.models import KeyValueStoreRecord, KeyValueStoreRecordMetadata -from ._api_client_factory import create_api_client +from ._api_client_creation import create_storage_api_client from ._models import ApifyKeyValueStoreMetadata, KeyValueStoreListKeysPage if TYPE_CHECKING: @@ -89,8 +89,8 @@ async def open( `id`, `name`, or `alias` is provided, or if none are provided and no default storage ID is available in the configuration. """ - api_client = await create_api_client( - storage_type='key_value_store', + api_client = await create_storage_api_client( + storage_type='KeyValueStore', configuration=configuration, alias=alias, name=name, diff --git a/src/apify/storage_clients/_apify/_request_queue_client.py b/src/apify/storage_clients/_apify/_request_queue_client.py index 86888626..be6bc5c9 100644 --- a/src/apify/storage_clients/_apify/_request_queue_client.py +++ b/src/apify/storage_clients/_apify/_request_queue_client.py @@ -7,7 +7,7 @@ from crawlee.storage_clients._base import RequestQueueClient -from ._api_client_factory import create_api_client +from ._api_client_creation import create_storage_api_client from ._models import ApifyRequestQueueMetadata, RequestQueueStats from ._request_queue_shared_client import ApifyRequestQueueSharedClient from ._request_queue_single_client import ApifyRequestQueueSingleClient @@ -222,8 +222,8 @@ async def open( `id`, `name`, or `alias` is provided, or if none are provided and no default storage ID is available in the configuration. """ - api_client = await create_api_client( - storage_type='request_queue', + api_client = await create_storage_api_client( + storage_type='RequestQueue', configuration=configuration, alias=alias, name=name, diff --git a/tests/integration/apify_api/conftest.py b/tests/integration/apify_api/conftest.py index 98c2141c..d2842475 100644 --- a/tests/integration/apify_api/conftest.py +++ b/tests/integration/apify_api/conftest.py @@ -9,7 +9,7 @@ from crawlee import service_locator import apify._actor -from apify.storage_clients._apify._alias_resolver import AliasResolver +from apify.storage_clients._apify._alias_resolving import AliasResolver if TYPE_CHECKING: from collections.abc import Callable diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index 2597b3d6..65a0c845 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -17,7 +17,7 @@ import apify._actor import apify.log -from apify.storage_clients._apify._alias_resolver import AliasResolver +from apify.storage_clients._apify._alias_resolving import AliasResolver if TYPE_CHECKING: from collections.abc import Callable, Iterator From 1ab1ab4b94f6969e1d35a90360f0793be708505d Mon Sep 17 00:00:00 2001 From: Josef Prochazka Date: Fri, 14 Nov 2025 09:27:37 +0100 Subject: [PATCH 10/10] Align `id` handling after new storage creation --- src/apify/storage_clients/_apify/_alias_resolving.py | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/src/apify/storage_clients/_apify/_alias_resolving.py b/src/apify/storage_clients/_apify/_alias_resolving.py index fad2329b..e357333f 100644 --- a/src/apify/storage_clients/_apify/_alias_resolving.py +++ b/src/apify/storage_clients/_apify/_alias_resolving.py @@ -105,16 +105,8 @@ async def open_by_alias( # Create new unnamed storage and store alias mapping raw_metadata = await collection_client.get_or_create() - # Determine metadata class to parse the ID - if isinstance(raw_metadata, dict): - storage_id = raw_metadata.get('id') - if not storage_id: - raise ValueError('Failed to get storage ID from API response') - else: - raise TypeError('Unexpected API response format') - - await alias_resolver.store_mapping(storage_id=storage_id) - return get_resource_client_by_id(storage_id) + await alias_resolver.store_mapping(storage_id=raw_metadata['id']) + return get_resource_client_by_id(raw_metadata['id']) class AliasResolver: