Skip to content

obstore implementations for .getsize and .getsize_prefix #3227

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/3227.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add lightweight implementations of .getsize() and .getsize_prefix() for ObjectStore.
47 changes: 27 additions & 20 deletions src/zarr/storage/_obstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from zarr.core.config import config

if TYPE_CHECKING:
from collections.abc import AsyncGenerator, Coroutine, Iterable
from collections.abc import AsyncGenerator, Coroutine, Iterable, Sequence
from typing import Any

from obstore import ListResult, ListStream, ObjectMeta, OffsetRange, SuffixRange
Expand Down Expand Up @@ -212,41 +212,48 @@ def supports_listing(self) -> bool:
# docstring inherited
return True

def list(self) -> AsyncGenerator[str, None]:
# docstring inherited
async def _list(self, prefix: str | None = None) -> AsyncGenerator[ObjectMeta, None]:
import obstore as obs

objects: ListStream[list[ObjectMeta]] = obs.list(self.store)
return _transform_list(objects)
objects: ListStream[Sequence[ObjectMeta]] = obs.list(self.store, prefix=prefix)
async for batch in objects:
for item in batch:
yield item

def list_prefix(self, prefix: str) -> AsyncGenerator[str, None]:
# return (obj async for obj in _transform_list(objects))

def list(self) -> AsyncGenerator[str, None]:
# docstring inherited
import obstore as obs
return (obj["path"] async for obj in self._list())

objects: ListStream[list[ObjectMeta]] = obs.list(self.store, prefix=prefix)
return _transform_list(objects)
def list_prefix(self, prefix: str) -> AsyncGenerator[str, None]:
# docstring inherited
return (obj["path"] async for obj in self._list(prefix))

def list_dir(self, prefix: str) -> AsyncGenerator[str, None]:
# docstring inherited
import obstore as obs

coroutine = obs.list_with_delimiter_async(self.store, prefix=prefix)
coroutine: Coroutine[Any, Any, ListResult[Sequence[ObjectMeta]]] = (
obs.list_with_delimiter_async(self.store, prefix=prefix)
)
return _transform_list_dir(coroutine, prefix)

async def getsize(self, key: str) -> int:
# docstring inherited
import obstore as obs

async def _transform_list(
list_stream: ListStream[list[ObjectMeta]],
) -> AsyncGenerator[str, None]:
"""
Transform the result of list into an async generator of paths.
"""
async for batch in list_stream:
for item in batch:
yield item["path"]
resp = await obs.head_async(self.store, key)
return resp["size"]

async def getsize_prefix(self, prefix: str) -> int:
# docstring inherited
sizes = [obj["size"] async for obj in self._list(prefix=prefix)]
return sum(sizes)


async def _transform_list_dir(
list_result_coroutine: Coroutine[Any, Any, ListResult[list[ObjectMeta]]], prefix: str
list_result_coroutine: Coroutine[Any, Any, ListResult[Sequence[ObjectMeta]]], prefix: str
) -> AsyncGenerator[str, None]:
"""
Transform the result of list_with_delimiter into an async generator of paths.
Expand Down
15 changes: 15 additions & 0 deletions tests/test_store/test_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,21 @@ def test_store_init_raises(self) -> None:
with pytest.raises(TypeError):
ObjectStore("path/to/store")

async def test_store_getsize(self, store: ObjectStore) -> None:
buf = cpu.Buffer.from_bytes(b"\x01\x02\x03\x04")
await self.set(store, "key", buf)
size = await store.getsize("key")
assert size == len(buf)

async def test_store_getsize_prefix(self, store: ObjectStore) -> None:
buf = cpu.Buffer.from_bytes(b"\x01\x02\x03\x04")
await self.set(store, "c/key1/0", buf)
await self.set(store, "c/key2/0", buf)
size = await store.getsize_prefix("c/key1")
assert size == len(buf)
total_size = await store.getsize_prefix("c")
assert total_size == len(buf) * 2


@pytest.mark.slow_hypothesis
def test_zarr_hierarchy():
Expand Down