Skip to content

Commit df76154

Browse files
updates
1 parent 21fb656 commit df76154

File tree

3 files changed

+29
-21
lines changed

3 files changed

+29
-21
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ Unreleased
1010
- The block size is now used for partitioned uploads. Previously, 1 GiB was used for each uploaded block irrespective of the block size
1111
- Updated default block size to be 50 MiB. Set `blocksize` for `AzureBlobFileSystem` or `block_size` when opening `AzureBlobFile` to revert back to 5 MiB default.
1212
- `AzureBlobFile` now inherits the block size from `AzureBlobFileSystem` when fs.open() is called and a block_size is not passed in.
13+
- Added concurrency for `_async_upload_chunk`. Can be set using `max_concurrency` for `AzureBlobFileSystem`.
1314

1415

1516
2024.12.0

adlfs/spec.py

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2156,6 +2156,15 @@ def _get_chunks(self, data):
21562156
yield data[start:end]
21572157
start = end
21582158

2159+
async def _upload(self, chunk, block_id, semaphore):
2160+
async with semaphore:
2161+
async with self.container_client.get_blob_client(blob=self.blob) as bc:
2162+
await bc.stage_block(
2163+
block_id=block_id,
2164+
data=chunk,
2165+
length=len(chunk),
2166+
)
2167+
21592168
async def _async_upload_chunk(
21602169
self, final: bool = False, max_concurrency=None, **kwargs
21612170
):
@@ -2180,28 +2189,20 @@ async def _async_upload_chunk(
21802189
max_concurrency = max_concurrency or self.fs.max_concurrency or 1
21812190
semaphore = asyncio.Semaphore(max_concurrency)
21822191
tasks = []
2183-
block_ids = []
2192+
block_ids = self._block_list or []
2193+
start_idx = len(block_ids)
21842194
chunks = list(self._get_chunks(data))
21852195
for _ in range(len(chunks)):
21862196
block_ids.append(block_id)
21872197
block_id = self._get_block_id(block_ids)
2198+
21882199
if chunks:
21892200
self._block_list = block_ids
2190-
for chunk, block_id in zip(chunks, block_ids):
2191-
2192-
async def _upload_chunk(chunk=chunk, block_id=block_id):
2193-
async with semaphore:
2194-
async with self.container_client.get_blob_client(
2195-
blob=self.blob
2196-
) as bc:
2197-
await bc.stage_block(
2198-
block_id=block_id,
2199-
data=chunk,
2200-
length=len(chunk),
2201-
)
2202-
2203-
tasks.append(_upload_chunk())
2201+
for chunk, block_id in zip(chunks, block_ids[start_idx:]):
2202+
tasks.append(self._upload(chunk, block_id, semaphore))
2203+
22042204
await asyncio.gather(*tasks)
2205+
22052206
if final:
22062207
block_list = [BlobBlock(_id) for _id in self._block_list]
22072208
async with self.container_client.get_blob_client(

adlfs/tests/test_spec.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2142,20 +2142,26 @@ def test_blobfile_default_blocksize(storage):
21422142
assert f.blocksize == 50 * 2**20
21432143

21442144

2145-
@pytest.mark.parametrize("max_concurrency", [1, 2, 4, 8])
2146-
def test_large_blob_max_concurrency(storage, max_concurrency):
2145+
@pytest.mark.parametrize(
2146+
"max_concurrency, blob_size",
2147+
[
2148+
(1, 51 * 2**20),
2149+
(4, 200 * 2**20),
2150+
(4, 49 * 2**20),
2151+
],
2152+
)
2153+
def test_max_concurrency(storage, max_concurrency, blob_size):
21472154
fs = AzureBlobFileSystem(
21482155
account_name=storage.account_name,
21492156
connection_string=CONN_STR,
21502157
max_concurrency=max_concurrency,
21512158
)
2152-
blob_size = 1_120_000_000
21532159
data = os.urandom(blob_size)
21542160
fs.mkdir("large-file-container")
2155-
path = "large-file-container/blob.bin"
2161+
path = "large-file-container/blob.txt"
21562162

2157-
with fs.open(path, "wb") as dst:
2158-
dst.write(data)
2163+
with fs.open(path, "wb") as f:
2164+
f.write(data)
21592165

21602166
assert fs.exists(path)
21612167
assert fs.size(path) == blob_size

0 commit comments

Comments
 (0)