Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions cdmtaskservice/s3/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,15 +212,15 @@ async def _fnc_wrapper(self, client, func):
code = e.response["Error"]["Code"]
if code == "SignatureDoesNotMatch":
raise S3ClientConnectError("s3 access credentials are invalid")
if code == "404" or code == "NoSuchBucket" or code == "NoSuchKey":
if code in ("404", "NoSuchBucket", "NoSuchKey"):
if bucket:
raise S3BucketNotFoundError(
f"The bucket '{bucket}' was not found on the s3 system"
)
raise S3PathNotFoundError(
f"The path '{path}' was not found on the s3 system"
) from e
if code == "AccessDenied" or code == "403": # why both? Both 403s
if code in ("AccessDenied", "403"): # why both? Both 403s
# may need to add other cases here
op = "Write" if write else "Read"
if bucket:
Expand All @@ -235,7 +235,8 @@ async def _fnc_wrapper(self, client, func):
raise S3ClientConnectError(
"Access denied to list buckets on the s3 system"
) from e
if code == "XAmzContentChecksumMismatch":
# BAdDigest = CEPH, ChecksumMismatch = Minio
if code in ("XAmzContentChecksumMismatch", "BadDigest"):
raise S3ChecksumMismatchError(f"Checksum mismatch for upload to {path}")
logger.exception(
f"Unexpected response from S3. Response data:\n{e.response}")
Expand Down Expand Up @@ -504,6 +505,8 @@ async def _upload(self, client, buk: str, key: str, loc: Path, crc64nvme: str):
PartNumber=part_number,
UploadId=upload_id,
Body=chunk,
# Required for CEPH to check checksum is valid
ChecksumAlgorithm="CRC64NVME",
ChecksumCRC64NVME=part_crc
)
parts.append({
Expand All @@ -512,8 +515,7 @@ async def _upload(self, client, buk: str, key: str, loc: Path, crc64nvme: str):
"ChecksumCRC64NVME": part_crc
})
part_number += 1

await client.complete_multipart_upload(
complete_resp = await client.complete_multipart_upload(
Bucket=buk,
Key=key,
UploadId=upload_id,
Expand All @@ -524,6 +526,13 @@ async def _upload(self, client, buk: str, key: str, loc: Path, crc64nvme: str):
except Exception:
await client.abort_multipart_upload(Bucket=buk, Key=key, UploadId=upload_id)
raise
# for some reason I don't understand, I can't get CEPH to reject the upload
# if the full body checksum is wrong, so we check it here. Note this will
# not be covered when testing against Minio, for example, which doesn't need this
# code
returned_crc = complete_resp["ChecksumCRC64NVME"]
if returned_crc != crc64nvme:
raise S3ChecksumMismatchError(f"Checksum mismatch for upload to {buk}/{key}")

async def presign_get_urls(self, paths: S3Paths, expiration_sec: int = 3600) -> list[str]:
"""
Expand Down
41 changes: 39 additions & 2 deletions test/s3/s3_client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,25 @@ async def test_upload_objects_from_file_fail_huge_file(minio, tmp_path):


@pytest.mark.asyncio
async def test_upload_objects_from_file_fail_bad_crc(minio, tmp_path):
async def test_upload_objects_from_file_fail_bad_crc_single_part(minio, tmp_path):
await minio.clean() # couldn't get this to work as a fixture
await minio.create_bucket("test-bucket")
with open(tmp_path / "smallfile", mode="w") as f:
f.write("foobar") # small file, single-part upload
async with _client(minio) as cli:
await _upload_objects_from_file_fail(
cli,
S3Paths(["test-bucket/bar"]),
[tmp_path / "smallfile"],
["xYZB5jZwpls="], # actual is oYZB5jZwpls=
S3ChecksumMismatchError(f"Checksum mismatch for upload to test-bucket/bar")
)


@pytest.mark.asyncio
async def test_upload_objects_from_file_fail_bad_crc_multipart(minio, tmp_path):
# CEPH appears (?) to have a bug in full object CRC handling for multipart
# uploads, so the code path is different from single part
await minio.clean() # couldn't get this to work as a fixture
await minio.create_bucket("test-bucket")
with open(tmp_path / "smallfile", mode="w") as f:
Expand All @@ -670,11 +688,30 @@ async def test_upload_objects_from_file_fail_bad_crc(minio, tmp_path):
cli,
S3Paths(["test-bucket/bar"]),
[tmp_path / "smallfile"],
["xYZB5jZwpls="],
["x+KiHdDNTNM="], # actual is D+KiHdDNTNM=
S3ChecksumMismatchError(f"Checksum mismatch for upload to test-bucket/bar")
)


@pytest.mark.asyncio
async def test_upload_objects_from_file_fail_bad_crc_multipart_part(minio, tmp_path):
# Verifies CEPH validates per-part checksums when ChecksumAlgorithm is set on upload_part.
# Pass the correct full-body CRC so any failure must come from the per-part check.
await minio.clean() # couldn't get this to work as a fixture
await minio.create_bucket("test-bucket")
with open(tmp_path / "largefile", mode="w") as f:
f.write("0123456789abcdef" * 589824) # 9 MiB, force multipart
async with _client(minio) as cli:
with patch.object(cli, "_b64_crc64nvme", return_value="AAAAAAAAAAA="):
await _upload_objects_from_file_fail(
cli,
S3Paths(["test-bucket/bar"]),
[tmp_path / "largefile"],
["D+KiHdDNTNM="], # correct full-body CRC
S3ChecksumMismatchError("Checksum mismatch for upload to test-bucket/bar")
)


@pytest.mark.asyncio
async def test_upload_objects_from_file_fail_no_file(minio):
await minio.clean() # couldn't get this to work as a fixture
Expand Down
Loading