Skip to content

Commit d73bf2b

Browse files
committed
feat(_put_file): dynamically adjust chunksize based on file size
`_put_file` will automatically increase the `chunksize` when uploading a large file to remain within 10,000 chunks limit. Fixes #971.
1 parent ec57f88 commit d73bf2b

File tree

1 file changed

+130
-66
lines changed

1 file changed

+130
-66
lines changed

s3fs/core.py

Lines changed: 130 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
# -*- coding: utf-8 -*-
2+
import math
23
import asyncio
34
import errno
45
import io
56
import logging
67
import mimetypes
78
import os
89
import socket
9-
from typing import Tuple, Optional
10+
from typing import IO, Tuple, Optional
1011
import weakref
1112
import re
1213

@@ -68,6 +69,9 @@ def setup_logging(level=None):
6869
FSTimeoutError,
6970
ResponseParserError,
7071
)
72+
MIN_CHUNK_SIZE = 5 * 2**20 # minimum part size for multipart upload is 5MiB
73+
MAX_CHUNK_SIZE = 5 * 2**30 # maximum part size for S3 multipart upload is 5GiB
74+
MAX_UPLOAD_PARTS = 10_000 # maximum number of parts for S3 multipart upload
7175

7276
if ClientPayloadError is not None:
7377
S3_RETRYABLE_ERRORS += (ClientPayloadError,)
@@ -1230,7 +1234,7 @@ async def _put_file(
12301234
lpath,
12311235
rpath,
12321236
callback=_DEFAULT_CALLBACK,
1233-
chunksize=50 * 2**20,
1237+
chunksize=None,
12341238
max_concurrency=None,
12351239
mode="overwrite",
12361240
**kwargs,
@@ -1258,43 +1262,47 @@ async def _put_file(
12581262
if content_type is not None:
12591263
kwargs["ContentType"] = content_type
12601264

1261-
with open(lpath, "rb") as f0:
1262-
if size < min(5 * 2**30, 2 * chunksize):
1263-
chunk = f0.read()
1265+
if size < min(5 * 2**30, 2 * (chunksize or 5 * 2**20)):
1266+
with open(lpath, "rb") as f0:
12641267
await self._call_s3(
1265-
"put_object", Bucket=bucket, Key=key, Body=chunk, **kwargs, **match
1268+
"put_object",
1269+
Bucket=bucket,
1270+
Key=key,
1271+
Body=f0,
1272+
**kwargs,
1273+
**match,
12661274
)
12671275
callback.relative_update(size)
1268-
else:
1269-
1270-
mpu = await self._call_s3(
1271-
"create_multipart_upload", Bucket=bucket, Key=key, **kwargs
1276+
else:
1277+
mpu = await self._call_s3(
1278+
"create_multipart_upload", Bucket=bucket, Key=key, **kwargs
1279+
)
1280+
try:
1281+
out = await self._upload_file_part_concurrent(
1282+
bucket,
1283+
key,
1284+
mpu,
1285+
lpath,
1286+
size,
1287+
callback=callback,
1288+
chunksize=chunksize,
1289+
max_concurrency=max_concurrency,
12721290
)
1273-
try:
1274-
out = await self._upload_file_part_concurrent(
1275-
bucket,
1276-
key,
1277-
mpu,
1278-
f0,
1279-
callback=callback,
1280-
chunksize=chunksize,
1281-
max_concurrency=max_concurrency,
1282-
)
1283-
parts = [
1284-
{"PartNumber": i + 1, "ETag": o["ETag"]}
1285-
for i, o in enumerate(out)
1286-
]
1287-
await self._call_s3(
1288-
"complete_multipart_upload",
1289-
Bucket=bucket,
1290-
Key=key,
1291-
UploadId=mpu["UploadId"],
1292-
MultipartUpload={"Parts": parts},
1293-
**match,
1294-
)
1295-
except Exception:
1296-
await self._abort_mpu(bucket, key, mpu["UploadId"])
1297-
raise
1291+
parts = [
1292+
{"PartNumber": i + 1, "ETag": o["ETag"]} for i, o in enumerate(out)
1293+
]
1294+
await self._call_s3(
1295+
"complete_multipart_upload",
1296+
Bucket=bucket,
1297+
Key=key,
1298+
UploadId=mpu["UploadId"],
1299+
MultipartUpload={"Parts": parts},
1300+
**match,
1301+
)
1302+
except Exception:
1303+
await self._abort_mpu(bucket, key, mpu["UploadId"])
1304+
raise
1305+
12981306
while rpath:
12991307
self.invalidate_cache(rpath)
13001308
rpath = self._parent(rpath)
@@ -1304,45 +1312,51 @@ async def _upload_file_part_concurrent(
13041312
bucket,
13051313
key,
13061314
mpu,
1307-
f0,
1315+
path,
1316+
filesize,
13081317
callback=_DEFAULT_CALLBACK,
1309-
chunksize=50 * 2**20,
1318+
chunksize=None,
13101319
max_concurrency=None,
13111320
):
13121321
max_concurrency = max_concurrency or self.max_concurrency
13131322
if max_concurrency < 1:
13141323
raise ValueError("max_concurrency must be >= 1")
13151324

1316-
async def _upload_chunk(chunk, part_number):
1317-
result = await self._call_s3(
1318-
"upload_part",
1319-
Bucket=bucket,
1320-
PartNumber=part_number,
1321-
UploadId=mpu["UploadId"],
1322-
Body=chunk,
1323-
Key=key,
1324-
)
1325-
callback.relative_update(len(chunk))
1326-
return result
1327-
1328-
out = []
1329-
while True:
1330-
chunks = []
1331-
for i in range(max_concurrency):
1332-
chunk = f0.read(chunksize)
1333-
if chunk:
1334-
chunks.append(chunk)
1335-
if not chunks:
1336-
break
1337-
out.extend(
1338-
await asyncio.gather(
1339-
*[
1340-
_upload_chunk(chunk, len(out) + i)
1341-
for i, chunk in enumerate(chunks, 1)
1342-
]
1325+
default_chunksize = 50 * 2**20 # 50 MiB
1326+
chunksize = max(chunksize or default_chunksize, MIN_CHUNK_SIZE)
1327+
required_chunks = math.ceil(filesize / chunksize)
1328+
# adjust chunksize to fit within the MAX_UPLOAD_PARTS limit
1329+
if required_chunks > MAX_UPLOAD_PARTS:
1330+
chunksize = math.ceil(filesize / MAX_UPLOAD_PARTS)
1331+
1332+
num_parts = math.ceil(filesize / chunksize)
1333+
logger.debug(
1334+
"uploading %d parts with a chunksize of %d and a concurrency of %d",
1335+
num_parts,
1336+
chunksize,
1337+
max_concurrency,
1338+
)
1339+
1340+
async def _upload_part(part_number):
1341+
with open(path, mode="rb") as f:
1342+
start = chunksize * (part_number - 1)
1343+
f.seek(start)
1344+
end = min(start + chunksize, filesize)
1345+
size = end - start
1346+
file_chunk = FileChunk(f, start, size, f.name)
1347+
result = await self._call_s3(
1348+
"upload_part",
1349+
Bucket=bucket,
1350+
PartNumber=part_number,
1351+
UploadId=mpu["UploadId"],
1352+
Body=file_chunk,
1353+
Key=key,
13431354
)
1344-
)
1345-
return out
1355+
callback.relative_update(size)
1356+
return result
1357+
1358+
coros = [_upload_part(part_number) for part_number in range(1, num_parts + 1)]
1359+
return await _run_coros_in_chunks(coros, batch_size=max_concurrency)
13461360

13471361
async def _get_file(
13481362
self, rpath, lpath, callback=_DEFAULT_CALLBACK, version_id=None, **kwargs
@@ -2566,3 +2580,53 @@ async def _call_and_read():
25662580
resp["Body"].close()
25672581

25682582
return await _error_wrapper(_call_and_read, retries=fs.retries)
2583+
2584+
2585+
class FileChunk(io.RawIOBase):
2586+
def __init__(self, fileobj: IO[bytes], offset: int, size: int, name: str):
2587+
self.fileobj = fileobj
2588+
self.offset = offset
2589+
self.size = size
2590+
self.position = 0
2591+
self.name = name
2592+
2593+
def readable(self):
2594+
return True
2595+
2596+
def writable(self):
2597+
return False
2598+
2599+
def seekable(self):
2600+
return self.fileobj.seekable()
2601+
2602+
def tell(self):
2603+
return self.position
2604+
2605+
def seek(self, position, whence=io.SEEK_SET):
2606+
if whence == io.SEEK_SET:
2607+
self.position = min(max(position, 0), self.size)
2608+
elif whence == io.SEEK_CUR:
2609+
if position < 0:
2610+
self.position = max(self.position + position, 0)
2611+
else:
2612+
self.position = min(self.position + position, self.size)
2613+
elif whence == io.SEEK_END:
2614+
self.position = max(min(self.size + position, self.size), 0)
2615+
else:
2616+
raise ValueError("Invalid argument")
2617+
self.fileobj.seek(self.offset + self.position)
2618+
return self.position
2619+
2620+
def readinto(self, b):
2621+
max_size = self.size - self.position
2622+
if max_size <= 0:
2623+
return 0
2624+
2625+
if len(b) > max_size:
2626+
b = memoryview(b)[:max_size]
2627+
res = self.fileobj.readinto(b)
2628+
if res != len(b):
2629+
raise RuntimeError("unexpected end of data")
2630+
self.position += res
2631+
assert self.fileobj.tell() == (self.offset + self.position)
2632+
return res

0 commit comments

Comments
 (0)