Skip to content

Commit d737a9f

Browse files
committed
Add semaphore as argument to AsyncFileSystemWrapper
1 parent d1d3eb0 commit d737a9f

File tree

2 files changed

+7
-3
lines changed

2 files changed

+7
-3
lines changed

fsspec/implementations/asyn_wrapper.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ def __init__(
5757
asynchronous=None,
5858
target_protocol=None,
5959
target_options=None,
60+
semaphore=None,
61+
max_concurrent_tasks=None,
6062
**kwargs,
6163
):
6264
if asynchronous is None:
@@ -67,7 +69,7 @@ def __init__(
6769
else:
6870
self.sync_fs = fsspec.filesystem(target_protocol, **target_options)
6971
self.protocol = self.sync_fs.protocol
70-
self.semaphore = asyncio.Semaphore(1) if not asynchronous else None
72+
self.semaphore = semaphore
7173
self._wrap_all_sync_methods()
7274

7375
@property

fsspec/implementations/tests/test_asyn_wrapper.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,9 @@ def test_open(tmpdir):
199199

200200
@pytest.mark.asyncio
201201
async def test_semaphore_synchronous():
202-
fs = AsyncFileSystemWrapper(LockedFileSystem(), asynchronous=False)
202+
fs = AsyncFileSystemWrapper(
203+
LockedFileSystem(), asynchronous=False, semaphore=asyncio.Semaphore(1)
204+
)
203205

204206
paths = [f"path_{i}" for i in range(1, 3)]
205207
results = await asyncio.gather(*(fs._cat_file(path) for path in paths))
@@ -209,7 +211,7 @@ async def test_semaphore_synchronous():
209211

210212
@pytest.mark.asyncio
211213
async def test_deadlock_when_asynchronous():
212-
fs = AsyncFileSystemWrapper(LockedFileSystem(), asynchronous=True)
214+
fs = AsyncFileSystemWrapper(LockedFileSystem(), asynchronous=False, semaphore=asyncio.Semaphore(3))
213215
paths = [f"path_{i}" for i in range(1, 3)]
214216

215217
with pytest.raises(RuntimeError, match="Concurrent requests!"):

0 commit comments

Comments
 (0)