Skip to content
88 changes: 28 additions & 60 deletions src/apify/storage_clients/_apify/_dataset_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,19 @@

from typing_extensions import override

from apify_client.clients import DatasetClientAsync
from crawlee._utils.byte_size import ByteSize
from crawlee._utils.file import json_dumps
from crawlee.storage_clients._base import DatasetClient
from crawlee.storage_clients.models import DatasetItemsListPage, DatasetMetadata
from crawlee.storages import Dataset

from ._utils import AliasResolver, create_apify_client
from ._utils import ApiClientFactory

if TYPE_CHECKING:
from collections.abc import AsyncIterator

from apify_client.clients import DatasetClientAsync
from apify_client.clients import DatasetCollectionClientAsync
from crawlee._types import JsonSerializable

from apify import Configuration
Expand Down Expand Up @@ -101,66 +102,12 @@ async def open(
`id`, `name`, or `alias` is provided, or if none are provided and no default storage ID is available
in the configuration.
"""
if sum(1 for param in [id, name, alias] if param is not None) > 1:
raise ValueError('Only one of "id", "name", or "alias" can be specified, not multiple.')

apify_client_async = create_apify_client(configuration)
apify_datasets_client = apify_client_async.datasets()

# Normalize unnamed default storage in cases where not defined in `configuration.default_dataset_id` to unnamed
# storage aliased as `__default__`
if not any([alias, name, id, configuration.default_dataset_id]):
alias = '__default__'

if alias:
# Check if there is pre-existing alias mapping in the default KVS.
async with AliasResolver(storage_type=Dataset, alias=alias, configuration=configuration) as _alias:
id = await _alias.resolve_id()

# There was no pre-existing alias in the mapping.
# Create a new unnamed storage and store the mapping.
if id is None:
new_storage_metadata = DatasetMetadata.model_validate(
await apify_datasets_client.get_or_create(),
)
id = new_storage_metadata.id
await _alias.store_mapping(storage_id=id)

# If name is provided, get or create the storage by name.
elif name:
id = DatasetMetadata.model_validate(
await apify_datasets_client.get_or_create(name=name),
).id

# If none are provided, try to get the default storage ID from environment variables.
elif id is None:
id = configuration.default_dataset_id
if not id:
raise ValueError(
'Dataset "id", "name", or "alias" must be specified, '
'or a default dataset ID must be set in the configuration.'
)

# Now create the client for the determined ID
apify_dataset_client = apify_client_async.dataset(dataset_id=id)

# Fetch its metadata.
metadata = await apify_dataset_client.get()

# If metadata is None, it means the storage does not exist, so we create it.
if metadata is None:
id = DatasetMetadata.model_validate(
await apify_datasets_client.get_or_create(),
).id
apify_dataset_client = apify_client_async.dataset(dataset_id=id)

# Verify that the storage exists by fetching its metadata again.
metadata = await apify_dataset_client.get()
if metadata is None:
raise ValueError(f'Opening dataset with id={id}, name={name}, and alias={alias} failed.')
api_client, _ = await DatasetApiClientFactory(
configuration=configuration, alias=alias, name=name, id=id
).get_client_with_metadata()

return cls(
api_client=apify_dataset_client,
api_client=api_client,
api_public_base_url='', # Remove in version 4.0, https://github.com/apify/apify-sdk-python/issues/635
lock=asyncio.Lock(),
)
Expand Down Expand Up @@ -309,3 +256,24 @@ async def _chunk_by_size(self, items: AsyncIterator[str]) -> AsyncIterator[str]:
last_chunk_size = payload_size + ByteSize(2) # Add 2 bytes for [] wrapper.

yield f'[{",".join(current_chunk)}]'


class DatasetApiClientFactory(ApiClientFactory[DatasetClientAsync, DatasetMetadata]):
@property
def _collection_client(self) -> DatasetCollectionClientAsync:
return self._api_client.datasets()

@property
def _default_id(self) -> str | None:
return self._configuration.default_dataset_id

@property
def _storage_type(self) -> type[Dataset]:
return Dataset

@staticmethod
def _get_metadata(raw_metadata: dict | None) -> DatasetMetadata:
return DatasetMetadata.model_validate(raw_metadata)

def _get_resource_client(self, id: str) -> DatasetClientAsync:
return self._api_client.dataset(dataset_id=id)
94 changes: 32 additions & 62 deletions src/apify/storage_clients/_apify/_key_value_store_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,22 @@

from typing_extensions import override

from apify_client.clients import (
KeyValueStoreClientAsync,
)
from crawlee.storage_clients._base import KeyValueStoreClient
from crawlee.storage_clients.models import KeyValueStoreRecord, KeyValueStoreRecordMetadata
from crawlee.storages import KeyValueStore

from ._models import ApifyKeyValueStoreMetadata, KeyValueStoreListKeysPage
from ._utils import AliasResolver, create_apify_client
from ._utils import ApiClientFactory

if TYPE_CHECKING:
from collections.abc import AsyncIterator

from apify_client.clients import KeyValueStoreClientAsync
from apify_client.clients import (
KeyValueStoreCollectionClientAsync,
)

from apify import Configuration

Expand Down Expand Up @@ -90,67 +95,11 @@ async def open(
`id`, `name`, or `alias` is provided, or if none are provided and no default storage ID is available
in the configuration.
"""
if sum(1 for param in [id, name, alias] if param is not None) > 1:
raise ValueError('Only one of "id", "name", or "alias" can be specified, not multiple.')

apify_client_async = create_apify_client(configuration)
apify_kvss_client = apify_client_async.key_value_stores()

# Normalize unnamed default storage in cases where not defined in `configuration.default_key_value_store_id` to
# unnamed storage aliased as `__default__`
if not any([alias, name, id, configuration.default_key_value_store_id]):
alias = '__default__'

if alias:
# Check if there is pre-existing alias mapping in the default KVS.
async with AliasResolver(storage_type=KeyValueStore, alias=alias, configuration=configuration) as _alias:
id = await _alias.resolve_id()

# There was no pre-existing alias in the mapping.
# Create a new unnamed storage and store the mapping.
if id is None:
# Create a new storage and store the alias mapping
new_storage_metadata = ApifyKeyValueStoreMetadata.model_validate(
await apify_kvss_client.get_or_create(),
)
id = new_storage_metadata.id
await _alias.store_mapping(storage_id=id)

# If name is provided, get or create the storage by name.
elif name:
id = ApifyKeyValueStoreMetadata.model_validate(
await apify_kvss_client.get_or_create(name=name),
).id

# If none are provided, try to get the default storage ID from environment variables.
elif id is None:
id = configuration.default_key_value_store_id
if not id:
raise ValueError(
'KeyValueStore "id", "name", or "alias" must be specified, '
'or a default KeyValueStore ID must be set in the configuration.'
)

# Now create the client for the determined ID
apify_kvs_client = apify_client_async.key_value_store(key_value_store_id=id)

# Fetch its metadata.
metadata = await apify_kvs_client.get()

# If metadata is None, it means the storage does not exist, so we create it.
if metadata is None:
id = ApifyKeyValueStoreMetadata.model_validate(
await apify_kvss_client.get_or_create(),
).id
apify_kvs_client = apify_client_async.key_value_store(key_value_store_id=id)

# Verify that the storage exists by fetching its metadata again.
metadata = await apify_kvs_client.get()
if metadata is None:
raise ValueError(f'Opening key-value store with id={id}, name={name}, and alias={alias} failed.')

api_client, _ = await KvsApiClientFactory(
configuration=configuration, alias=alias, name=name, id=id
).get_client_with_metadata()
return cls(
api_client=apify_kvs_client,
api_client=api_client,
api_public_base_url='', # Remove in version 4.0, https://github.com/apify/apify-sdk-python/issues/635
lock=asyncio.Lock(),
)
Expand Down Expand Up @@ -233,3 +182,24 @@ async def get_public_url(self, key: str) -> str:
A public URL that can be used to access the value of the given key in the KVS.
"""
return await self._api_client.get_record_public_url(key=key)


class KvsApiClientFactory(ApiClientFactory[KeyValueStoreClientAsync, ApifyKeyValueStoreMetadata]):
@property
def _collection_client(self) -> KeyValueStoreCollectionClientAsync:
return self._api_client.key_value_stores()

@property
def _default_id(self) -> str | None:
return self._configuration.default_key_value_store_id

@property
def _storage_type(self) -> type[KeyValueStore]:
return KeyValueStore

@staticmethod
def _get_metadata(raw_metadata: dict | None) -> ApifyKeyValueStoreMetadata:
return ApifyKeyValueStoreMetadata.model_validate(raw_metadata)

def _get_resource_client(self, id: str) -> KeyValueStoreClientAsync:
return self._api_client.key_value_store(key_value_store_id=id)
109 changes: 39 additions & 70 deletions src/apify/storage_clients/_apify/_request_queue_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,22 @@

from typing_extensions import override

from apify_client.clients import RequestQueueClientAsync
from crawlee._utils.crypto import crypto_random_object_id
from crawlee.storage_clients._base import RequestQueueClient
from crawlee.storage_clients.models import AddRequestsResponse, ProcessedRequest, RequestQueueMetadata
from crawlee.storages import RequestQueue

from ._models import ApifyRequestQueueMetadata, RequestQueueStats
from ._request_queue_shared_client import ApifyRequestQueueSharedClient
from ._request_queue_single_client import ApifyRequestQueueSingleClient
from ._utils import AliasResolver, create_apify_client
from ._utils import ApiClientFactory

if TYPE_CHECKING:
from collections.abc import Sequence

from apify_client.clients import RequestQueueClientAsync
from apify_client.clients import RequestQueueCollectionClientAsync
from crawlee import Request
from crawlee.storage_clients.models import AddRequestsResponse, ProcessedRequest, RequestQueueMetadata

from apify import Configuration

Expand Down Expand Up @@ -224,73 +225,14 @@ async def open(
`id`, `name`, or `alias` is provided, or if none are provided and no default storage ID is available
in the configuration.
"""
if sum(1 for param in [id, name, alias] if param is not None) > 1:
raise ValueError('Only one of "id", "name", or "alias" can be specified, not multiple.')

apify_client_async = create_apify_client(configuration)
apify_rqs_client = apify_client_async.request_queues()

# Normalize unnamed default storage in cases where not defined in `configuration.default_request_queue_id` to
# unnamed storage aliased as `__default__`
if not any([alias, name, id, configuration.default_request_queue_id]):
alias = '__default__'

if alias:
# Check if there is pre-existing alias mapping in the default KVS.
async with AliasResolver(storage_type=RequestQueue, alias=alias, configuration=configuration) as _alias:
id = await _alias.resolve_id()

# There was no pre-existing alias in the mapping.
# Create a new unnamed storage and store the mapping.
if id is None:
new_storage_metadata = RequestQueueMetadata.model_validate(
await apify_rqs_client.get_or_create(),
)
id = new_storage_metadata.id
await _alias.store_mapping(storage_id=id)

# If name is provided, get or create the storage by name.
elif name:
id = RequestQueueMetadata.model_validate(
await apify_rqs_client.get_or_create(name=name),
).id

# If none are provided, try to get the default storage ID from environment variables.
elif id is None:
id = configuration.default_request_queue_id
if not id:
raise ValueError(
'RequestQueue "id", "name", or "alias" must be specified, '
'or a default default_request_queue_id ID must be set in the configuration.'
)

# Use suitable client_key to make `hadMultipleClients` response of Apify API useful.
# It should persist across migrated or resurrected Actor runs on the Apify platform.
_api_max_client_key_length = 32
client_key = (configuration.actor_run_id or crypto_random_object_id(length=_api_max_client_key_length))[
:_api_max_client_key_length
]

apify_rq_client = apify_client_async.request_queue(request_queue_id=id, client_key=client_key)

# Fetch its metadata.
metadata = await apify_rq_client.get()

# If metadata is None, it means the storage does not exist, so we create it.
if metadata is None:
id = RequestQueueMetadata.model_validate(
await apify_rqs_client.get_or_create(),
).id
apify_rq_client = apify_client_async.request_queue(request_queue_id=id, client_key=client_key)

# Verify that the storage exists by fetching its metadata again.
metadata = await apify_rq_client.get()
if metadata is None:
raise ValueError(f'Opening request queue with id={id}, name={name}, and alias={alias} failed.')

metadata_model = RequestQueueMetadata.model_validate(metadata)

return cls(api_client=apify_rq_client, metadata=metadata_model, access=access)
_api_client, metadata = await RqApiClientFactory(
configuration=configuration, alias=alias, name=name, id=id
).get_client_with_metadata()
return cls(
api_client=_api_client,
metadata=metadata,
access=access,
)

@override
async def purge(self) -> None:
Expand All @@ -302,3 +244,30 @@ async def purge(self) -> None:
@override
async def drop(self) -> None:
await self._api_client.delete()


class RqApiClientFactory(ApiClientFactory[RequestQueueClientAsync, ApifyRequestQueueMetadata]):
@property
def _collection_client(self) -> RequestQueueCollectionClientAsync:
return self._api_client.request_queues()

@property
def _default_id(self) -> str | None:
return self._configuration.default_request_queue_id

@property
def _storage_type(self) -> type[RequestQueue]:
return RequestQueue

@staticmethod
def _get_metadata(raw_metadata: dict | None) -> ApifyRequestQueueMetadata:
return ApifyRequestQueueMetadata.model_validate(raw_metadata)

def _get_resource_client(self, id: str) -> RequestQueueClientAsync:
# Use suitable client_key to make `hadMultipleClients` response of Apify API useful.
# It should persist across migrated or resurrected Actor runs on the Apify platform.
_api_max_client_key_length = 32
client_key = (self._configuration.actor_run_id or crypto_random_object_id(length=_api_max_client_key_length))[
:_api_max_client_key_length
]
return self._api_client.request_queue(request_queue_id=id, client_key=client_key)
Loading