Skip to content

Preventing Deadlocks When Reading Metadata Concurrently via asyncio.gather #3207

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

dgegen
Copy link

@dgegen dgegen commented Jul 5, 2025

As described in #3196, I encountered issues opening Zarr v3 arrays stored over SFTP using fsspec. Specifically, python would freeze opening zarr arrays.

Root Cause

The issue stems from the use of asyncio.gather in zarr.core.array.get_array_metadata, which attempts to read multiple metadata files (e.g., .zarray, .zattrs, zarr.json) concurrently. This works well for truly asynchronous filesystems, but breaks when using systems like SFTPFileSystem, which does not seem to be concurrency-safe in async contexts (potentially relying on blocking I/O internally or managing connection states using global locks) leading to deadlocks or indefinite hangs when asyncio.gather is used to perform multiple reads simultaneously.

Solution

To address this, I’ve implemented a fallback to sequential reads for filesystems that are not concurrency-safe. The logic is as follows: For non asynchronous file systems, the user sets store.fs.asynchronous=False. The helper function is_concurrency_safe(store_path: StorePath) -> bool, checks this getattr(fs, "asynchronous", True). If True asyncio.gather is used, else we fall back to sequential await. This Preserves the performance benefit of concurrent reads for safe filesystems (e.g., local disk, S3, GCS), while preventing deadlocks and improved robustness when using backends like SFTP.

These changes may not address all scenarios not asynchronous file systems could cause issues, as there are several other instances of asyncio.gather in zarr.core.array and zarr.core.group. However, I opted to focus on this specific problem first, as enabling the opening of arrays and groups is likely the highest priority, and I wanted to discuss this approach before making too many changes.

I look forward to hearing your thoughts and seeing this issue resolved!

TODO:

  • Add unit tests and/or doctests in docstrings
  • Add docstrings and API docs for any new/modified user-facing classes and functions
  • New/modified features documented in docs/user-guide/*.rst
  • Changes documented as a new file in changes/
  • GitHub Actions have all passed
  • Test coverage is 100% (Codecov passes)

@github-actions github-actions bot added the needs release notes Automatically applied to PRs which haven't added release notes label Jul 5, 2025
@d-v-b
Copy link
Contributor

d-v-b commented Jul 5, 2025

Good detective work here! I think the ideal solution would keep store implementation details confined to the store classes themselves. So instead of the solution here, what if we override the get_many method on the fsspec store to include the logic you have added here, and then use that method instead of multiple gets

@dgegen
Copy link
Author

dgegen commented Jul 5, 2025

Very good point! Perhaps something along these lines?

class StorePath:
    # ...
    async def _is_concurrency_save(self):
        fs = getattr(self.store, "fs", None)
        return getattr(fs, "asynchronous", True)

    async def get_many(
        self,
        *suffixes : str,
        prototype: BufferPrototype | None = None,
        byte_range: ByteRequest | None = None,
    ):
        tasks = [
            (self / suffix).get(prototype=prototype, byte_range=byte_range) for suffix in suffixes
        ]
        if await self._is_concurrency_save():
            return await gather(*tasks)
        else:
            results = []
            for task in tasks:
                result = await task
                results.append(result)
            return results
            
class FsspecStore:
    # ...
    async def _get_many(
        self, requests: Iterable[tuple[str, BufferPrototype, ByteRequest | None]]
    ) -> AsyncGenerator[tuple[str, Buffer | None], None]:
        if getattr(self.fs, "asynchronous", True):
            async for result in super()._get_many(requests=requests):
                yield result
        else:
            for key, prototype, byte_range in requests:
                value = await self.get(key, prototype, byte_range)
                yield (key, value)
                

Copy link

codecov bot commented Jul 10, 2025

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 94.64%. Comparing base (9969a5d) to head (5338889).

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #3207      +/-   ##
==========================================
+ Coverage   94.62%   94.64%   +0.01%     
==========================================
  Files          78       78              
  Lines        8696     8718      +22     
==========================================
+ Hits         8229     8251      +22     
  Misses        467      467              
Files with missing lines Coverage Δ
src/zarr/abc/store.py 95.94% <100.00%> (+0.14%) ⬆️
src/zarr/core/array.py 98.38% <100.00%> (ø)
src/zarr/core/group.py 94.81% <100.00%> (ø)
src/zarr/storage/_common.py 92.74% <100.00%> (+0.03%) ⬆️
src/zarr/storage/_fsspec.py 90.19% <100.00%> (+0.54%) ⬆️
src/zarr/testing/store.py 100.00% <100.00%> (ø)
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@github-actions github-actions bot removed the needs release notes Automatically applied to PRs which haven't added release notes label Jul 10, 2025
Comment on lines 285 to 298
async def _is_concurrency_save(self):
fs = getattr(self.store, "fs", None)
return getattr(fs, "asynchronous", True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be a method specific to the fsspec store. the fsspec store should call it inside _get_many in order to chose which implementation to use.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

Comment on lines 3480 to 3482
(_join_paths([path, ZARRAY_JSON]), default_buffer_prototype(), None),
(_join_paths([path, ZGROUP_JSON]), default_buffer_prototype(), None),
(_join_paths([path, ZATTRS_JSON]), default_buffer_prototype(), None),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets bind _join_paths([path, X]) to a variable so that we don't call the _join_paths function so many times. for example:

zarray_path = _join_paths([path, ZARRAY_JSON])
...

@dgegen dgegen force-pushed the main branch 3 times, most recently from 7f72217 to 37d112e Compare July 16, 2025 13:10
- This pull request resolves the issue of deadlocks and indefinite hangs when
  opening Zarr v3 arrays on synchronous fsspec filesystems, by implementing a
  fallback to sequential reads for non-concurrency-safe filesystems, ensuring
  robust metadata retrieval without sacrificing performance for safe
  filesystems. Furthermore `Store.get_many` was modified to retrieve objects
  concurrently from storage. The previous implementation was sequential,
  awaiting each `self.get(*req)` before proceeding, contrary to the docstring.
- Introduced `Store.get_many_ordered` and `StorePath.get_many_ordered` to
  retrieve multiple metadata files in a single call, optimizing the retrieval
  process and reducing overhead. `StorePath.get_many_ordered` is used in
  `get_array_metadata`. `Store._get_many_ordered` is used in
  `_read_metadata_v2`.
- Modified `FsspecStore._get_many` and `FsspecStore._get_many_ordered`
  to conditionally use `asyncio.gather` based on the concurrency safety
  of the underlying file system, enhancing compatibility with
  synchronous file systems by avoiding deadlocks when accessing metadata
  concurrently. Adding tests `LockableFileSystem` to test with
  async/sync behavior.
@dgegen
Copy link
Author

dgegen commented Jul 16, 2025

Thanks to the feedback I received and some additional testing, I have substantially improved my initial solution. I look forward to hearing your thoughts on it! :)

PS: I'm also not sure why two of the tests failed because they don't seem to relate to any of the changes I've made.

Comment on lines +429 to +435
async def _get_many_ordered(
self, requests: Iterable[tuple[str, BufferPrototype, ByteRequest | None]]
) -> tuple[Buffer | None, ...]:
"""
Retrieve a collection of objects from storage in the order they were requested.
"""
tasks = [self.get(*req) for req in requests]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I see the use for this method. If store users want fetches to happen in a specific order, then users can call get in a loop. If users only want the results of their fetches to be ordered, they can re-order the results after receiving them.

Copy link
Author

@dgegen dgegen Jul 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I introduced the method _get_many_ordered because it is the only used case in the current implementation. Not using it leads to repetitive boilerplate code because we would always find ourselves awaiting all fetches and then sorting the results afterwards. Using _get_many_ordered encapsulates this logic, resulting in code that is less verbose, less repetitive, and easier-to-read.

Example

Compare for example

zarray_bytes, zgroup_bytes, zattrs_bytes = await store._get_many_ordered(
    [
        (_join_paths([path, ZARRAY_JSON]), default_buffer_prototype(), None),
        (_join_paths([path, ZGROUP_JSON]), default_buffer_prototype(), None),
        (_join_paths([path, ZATTRS_JSON]), default_buffer_prototype(), None),
    ]
)

with

ordered_keys = [
    (_join_paths([path, ZARRAY_JSON]), default_buffer_prototype(), None),
    (_join_paths([path, ZGROUP_JSON]), default_buffer_prototype(), None),
    (_join_paths([path, ZATTRS_JSON]), default_buffer_prototype(), None),
]

retrieved_objects = {}
async for key, value in store._get_many(ordered_keys):
    retrieved_objects[key] = value

zarray_bytes, zgroup_bytes, zattrs_bytes = tuple(retrieved_objects.get(key[0]) for key in ordered_keys)

The first block can be read much faster.

At the same time, we cannot use the original,

zarray_bytes, zgroup_bytes, zattrs_bytes = await asyncio.gather(
    store.get(_join_paths([path, ZARRAY_JSON]), prototype=default_buffer_prototype()),
    store.get(_join_paths([path, ZGROUP_JSON]), prototype=default_buffer_prototype()),
    store.get(_join_paths([path, ZATTRS_JSON]), prototype=default_buffer_prototype()),

as this would lead to deadlocks in synchronous file systems, and we of course also don't want to call .get sequentially because it reduces performance in asynchronous systems.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.get sequentially because it reduces performance in asynchronous systems.

get is async. caling it returns a coroutine. these can be scheduled together with asyncio.gather, which will preserve order. So calling get sequentially is not a performance problem.

Copy link
Author

@dgegen dgegen Jul 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course, but it is, if we were to await them sequentially, e.g. if we instead used

zarray_bytes = await store.get(_join_paths([path, ZARRAY_JSON]), prototype=default_buffer_prototype()) 
zgroup_bytes = await store.get(_join_paths([path, ZGROUP_JSON]), prototype=default_buffer_prototype())
zattrs_bytes = await store.get(_join_paths([path, ZATTRS_JSON]), prototype=default_buffer_prototype())

This would assure that synchronous file systems don't run into deadlocks. But it would not be a good alternative for asynchronous systems.

Copy link
Contributor

@d-v-b d-v-b Jul 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm confused, aren't you doing all this exactly because the fsspec sftp backend is not async? so then sequential awaiting (inside the logic of _get_many) is exactly what's we expect to happen, no?

Copy link
Author

@dgegen dgegen Jul 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but in the general case, the implementation should be asynchronous. It must be implemented, however, in such a way that we can make it synchronous by overwriting store methods in the FSSpec store, making what is generally asynchronous synchronous if store.fs.asynchronous==False.

Note also, that originally, we were not using await.gather wrapping multiple get statements, so this was not possible. And a general sequential solution is not desirable in an I/O-limited system.

Comment on lines +418 to +427
async def _get_with_name(
key: str, prototype: BufferPrototype, byte_range: ByteRequest | None
) -> tuple[str, Buffer | None]:
value = await self.get(key, prototype, byte_range)
return key, value

tasks = [_get_with_name(*req) for req in requests]
for completed in as_completed(tasks):
task = await completed
yield task
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the advantage of this new implementation? the previous implementation was extremely simple, which I think is good for an abc.

Copy link
Author

@dgegen dgegen Jul 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The claim in the docstring is incorrect given the previous implementation.

This loop is sequential: it awaits each self.get(*req) and yields it before moving on to the next. Each request is handled one at a time, in the exact order provided. Therefore, results are always yielded in the same order as the input requests.

It is thus not fully concurrent which would be desirable in an I/O-limited system and, at least as I understand, kind of defeats the purpose of having an asynchronous _get_many method yielding results in the first place. Because if we stick to the order, we might as well await all results and simply replace the implementation of _get_many with that of _get_many_ordered, making it faster and arguable more easy to used in the asynchronous case. If we want to give the extra flexibility of not awaiting all at once, but still requesting all at the same time, the new implementation would be the right one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the point of the default implementation is to be the simplest possible implementation of _get_many that any child class can safely support, given an implementation of get. But child classes should also be able to override this with more efficient methods where applicable, and in these cases the order of results is not guaranteed. hence the type annotation in the original method.

Copy link
Author

@dgegen dgegen Jul 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find this somewhat confusing, as I would have expected the standard implementation to be fully asynchronous. However, if the goal is to maximize simplicity, then having an asynchronous implementation that runs synchronously might be the way to go.

That being said, if we revert this to the original, we would only have to also remove the FsspecStore._get_many from my current solution. Unless you think we should not have a _get_many_ordered method and use the _get_many method instead and then always sort the values locally, as they could be of a different order in other implementations.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants