From 0586cdb4a9a50cecdfba1ce26b33d9aefc704da3 Mon Sep 17 00:00:00 2001 From: MrCreosote Date: Fri, 27 Mar 2026 14:06:33 -0700 Subject: [PATCH] Check multipart upload checksum manually Not clear why CEPH isn't throwing an error when the provided checksum is wrong; for now we work around it by checking ourselves --- cdmtaskservice/s3/client.py | 19 ++++++++++++----- test/s3/s3_client_test.py | 41 +++++++++++++++++++++++++++++++++++-- 2 files changed, 53 insertions(+), 7 deletions(-) diff --git a/cdmtaskservice/s3/client.py b/cdmtaskservice/s3/client.py index 5a0f0f6..8c7cf4f 100644 --- a/cdmtaskservice/s3/client.py +++ b/cdmtaskservice/s3/client.py @@ -212,7 +212,7 @@ async def _fnc_wrapper(self, client, func): code = e.response["Error"]["Code"] if code == "SignatureDoesNotMatch": raise S3ClientConnectError("s3 access credentials are invalid") - if code == "404" or code == "NoSuchBucket" or code == "NoSuchKey": + if code in ("404", "NoSuchBucket", "NoSuchKey"): if bucket: raise S3BucketNotFoundError( f"The bucket '{bucket}' was not found on the s3 system" @@ -220,7 +220,7 @@ async def _fnc_wrapper(self, client, func): raise S3PathNotFoundError( f"The path '{path}' was not found on the s3 system" ) from e - if code == "AccessDenied" or code == "403": # why both? Both 403s + if code in ("AccessDenied", "403"): # why both? Both 403s # may need to add other cases here op = "Write" if write else "Read" if bucket: @@ -235,7 +235,8 @@ async def _fnc_wrapper(self, client, func): raise S3ClientConnectError( "Access denied to list buckets on the s3 system" ) from e - if code == "XAmzContentChecksumMismatch": + # BAdDigest = CEPH, ChecksumMismatch = Minio + if code in ("XAmzContentChecksumMismatch", "BadDigest"): raise S3ChecksumMismatchError(f"Checksum mismatch for upload to {path}") logger.exception( f"Unexpected response from S3. Response data:\n{e.response}") @@ -504,6 +505,8 @@ async def _upload(self, client, buk: str, key: str, loc: Path, crc64nvme: str): PartNumber=part_number, UploadId=upload_id, Body=chunk, + # Required for CEPH to check checksum is valid + ChecksumAlgorithm="CRC64NVME", ChecksumCRC64NVME=part_crc ) parts.append({ @@ -512,8 +515,7 @@ async def _upload(self, client, buk: str, key: str, loc: Path, crc64nvme: str): "ChecksumCRC64NVME": part_crc }) part_number += 1 - - await client.complete_multipart_upload( + complete_resp = await client.complete_multipart_upload( Bucket=buk, Key=key, UploadId=upload_id, @@ -524,6 +526,13 @@ async def _upload(self, client, buk: str, key: str, loc: Path, crc64nvme: str): except Exception: await client.abort_multipart_upload(Bucket=buk, Key=key, UploadId=upload_id) raise + # for some reason I don't understand, I can't get CEPH to reject the upload + # if the full body checksum is wrong, so we check it here. Note this will + # not be covered when testing against Minio, for example, which doesn't need this + # code + returned_crc = complete_resp["ChecksumCRC64NVME"] + if returned_crc != crc64nvme: + raise S3ChecksumMismatchError(f"Checksum mismatch for upload to {buk}/{key}") async def presign_get_urls(self, paths: S3Paths, expiration_sec: int = 3600) -> list[str]: """ diff --git a/test/s3/s3_client_test.py b/test/s3/s3_client_test.py index 7f7c4c8..ee97dce 100644 --- a/test/s3/s3_client_test.py +++ b/test/s3/s3_client_test.py @@ -660,7 +660,25 @@ async def test_upload_objects_from_file_fail_huge_file(minio, tmp_path): @pytest.mark.asyncio -async def test_upload_objects_from_file_fail_bad_crc(minio, tmp_path): +async def test_upload_objects_from_file_fail_bad_crc_single_part(minio, tmp_path): + await minio.clean() # couldn't get this to work as a fixture + await minio.create_bucket("test-bucket") + with open(tmp_path / "smallfile", mode="w") as f: + f.write("foobar") # small file, single-part upload + async with _client(minio) as cli: + await _upload_objects_from_file_fail( + cli, + S3Paths(["test-bucket/bar"]), + [tmp_path / "smallfile"], + ["xYZB5jZwpls="], # actual is oYZB5jZwpls= + S3ChecksumMismatchError(f"Checksum mismatch for upload to test-bucket/bar") + ) + + +@pytest.mark.asyncio +async def test_upload_objects_from_file_fail_bad_crc_multipart(minio, tmp_path): + # CEPH appears (?) to have a bug in full object CRC handling for multipart + # uploads, so the code path is different from single part await minio.clean() # couldn't get this to work as a fixture await minio.create_bucket("test-bucket") with open(tmp_path / "smallfile", mode="w") as f: @@ -670,11 +688,30 @@ async def test_upload_objects_from_file_fail_bad_crc(minio, tmp_path): cli, S3Paths(["test-bucket/bar"]), [tmp_path / "smallfile"], - ["xYZB5jZwpls="], + ["x+KiHdDNTNM="], # actual is D+KiHdDNTNM= S3ChecksumMismatchError(f"Checksum mismatch for upload to test-bucket/bar") ) +@pytest.mark.asyncio +async def test_upload_objects_from_file_fail_bad_crc_multipart_part(minio, tmp_path): + # Verifies CEPH validates per-part checksums when ChecksumAlgorithm is set on upload_part. + # Pass the correct full-body CRC so any failure must come from the per-part check. + await minio.clean() # couldn't get this to work as a fixture + await minio.create_bucket("test-bucket") + with open(tmp_path / "largefile", mode="w") as f: + f.write("0123456789abcdef" * 589824) # 9 MiB, force multipart + async with _client(minio) as cli: + with patch.object(cli, "_b64_crc64nvme", return_value="AAAAAAAAAAA="): + await _upload_objects_from_file_fail( + cli, + S3Paths(["test-bucket/bar"]), + [tmp_path / "largefile"], + ["D+KiHdDNTNM="], # correct full-body CRC + S3ChecksumMismatchError("Checksum mismatch for upload to test-bucket/bar") + ) + + @pytest.mark.asyncio async def test_upload_objects_from_file_fail_no_file(minio): await minio.clean() # couldn't get this to work as a fixture