-
Notifications
You must be signed in to change notification settings - Fork 9.1k
HADOOP-19604. ABFS: BlockId generation based on blockCount along with full blob md5 computation change #7777
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
This comment was marked as outdated.
This comment was marked as outdated.
💔 -1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
This comment was marked as outdated.
This comment was marked as outdated.
🎊 +1 overall
This message was automatically generated. |
============================================================
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Production code review. Will review test code in separate iteration.
...p-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobBlock.java
Outdated
Show resolved
Hide resolved
...p-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobBlock.java
Show resolved
Hide resolved
...p-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobBlock.java
Outdated
Show resolved
Hide resolved
* Helper method that generates blockId. | ||
* @param position The offset needed to generate blockId. | ||
* @return String representing the block ID generated. | ||
* Generates a Base64-encoded block ID string based on the given position, stream ID, and desired raw length. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How did we arrive at this logic?
Is there some server side recommendation to follow this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic is used across clients. They follow the pattern of UUID followed by index
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the comment, you have mentioned that block id is generated based on position, stream Id and raw length. Where exactly are we using position in this method? Am I missing something here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
corrected it
...-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java
Outdated
Show resolved
Hide resolved
@@ -1097,8 +1105,10 @@ public AbfsRestOperation flush(byte[] buffer, | |||
AbfsRestOperation op1 = getPathStatus(path, true, tracingContext, | |||
contextEncryptionAdapter); | |||
String metadataMd5 = op1.getResult().getResponseHeader(CONTENT_MD5); | |||
if (!md5Hash.equals(metadataMd5)) { | |||
throw ex; | |||
if (blobMd5 != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: combine if statements using &&
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taken
...-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java
Show resolved
Hide resolved
if (rawBlockId.length() < rawLength) { | ||
rawBlockId = String.format("%-" + rawLength + "s", rawBlockId) | ||
.replace(' ', '_'); | ||
} else if (rawBlockId.length() > rawLength) { | ||
rawBlockId = rawBlockId.substring(0, rawLength); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we use ternary logic here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that will make readability a bit difficult
byte[] digest = null; | ||
String fullBlobMd5 = null; | ||
try { | ||
// Clone the MessageDigest to avoid resetting the original state | ||
MessageDigest clonedMd5 = (MessageDigest) getAbfsOutputStream().getFullBlobContentMd5().clone(); | ||
digest = clonedMd5.digest(); | ||
} catch (CloneNotSupportedException e) { | ||
LOG.warn("Failed to clone MessageDigest instance", e); | ||
} | ||
if (digest != null && digest.length != 0) { | ||
fullBlobMd5 = Base64.getEncoder().encodeToString(digest); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this code is common in both DFS and Blob ingress handler class, maybe we can add it as a protected helper method in the abstract class AzureIngressHandler?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taken
@@ -134,7 +134,7 @@ protected long getBlockCount() { | |||
* | |||
* @param blockCount the count of blocks to set | |||
*/ | |||
public void setBlockCount(final long blockCount) { | |||
protected void setBlockCount(final long blockCount) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The modifier level was incorrect earlier, corrected it
@@ -1132,7 +1130,7 @@ public void testFlushSuccessWithConnectionResetOnResponseValidMd5() throws Excep | |||
Mockito.nullable(String.class), | |||
Mockito.anyString(), | |||
Mockito.nullable(ContextEncryptionAdapter.class), | |||
Mockito.any(TracingContext.class) | |||
Mockito.any(TracingContext.class), Mockito.anyString() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be Mockito.nullable(String.class) as the md5 here can be null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taken
md = MessageDigest.getInstance(MD5); | ||
} catch (NoSuchAlgorithmException e) { | ||
// MD5 algorithm not available; md will remain null | ||
// Log this in production code if needed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can remove this line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taken
pos += appendWithOffsetHelper(os, client, path, data, fs, pos, ONE_MB); | ||
pos += appendWithOffsetHelper(os, client, path, data, fs, pos, MB_2); | ||
appendWithOffsetHelper(os, client, path, data, fs, pos, MB_4 - 1); | ||
pos += appendWithOffsetHelper(os, client, path, data, fs, pos, 0, getMd5(data, 0, data.length)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: double spaces
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taken
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
@@ -42,23 +48,40 @@ public class AbfsBlobBlock extends AbfsBlock { | |||
* @param offset Used to generate blockId based on offset. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add newly added parameter in method comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taken
@@ -77,6 +81,7 @@ public AppendRequestParameters(final long position, | |||
* @param leaseId leaseId of the blob to be appended | |||
* @param isExpectHeaderEnabled true if the expect header is enabled | |||
* @param blobParams parameters specific to append operation on Blob Endpoint. | |||
* @param md5 The Base64-encoded MD5 hash of the block for data integrity validation. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: Format can be corrected - extra space before @param and after md5
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taken
this.blockId = generateBlockId(offset); | ||
this.blockIndex = blockIndex; | ||
String streamId = outputStream.getStreamID(); | ||
UUID streamIdGuid = UUID.nameUUIDFromBytes(streamId.getBytes(StandardCharsets.UTF_8)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can streamId be null? streamId.getBytes can raise null pointer exception. Better to handle it,
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
StreamId can never be null as this is set in constructor of AbfsOutputStream itself, this.outputStreamId = createOutputStreamId();
* Helper method that generates blockId. | ||
* @param position The offset needed to generate blockId. | ||
* @return String representing the block ID generated. | ||
* Generates a Base64-encoded block ID string based on the given position, stream ID, and desired raw length. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the comment, you have mentioned that block id is generated based on position, stream Id and raw length. Where exactly are we using position in this method? Am I missing something here?
@@ -982,6 +983,9 @@ public AbfsRestOperation appendBlock(final String path, | |||
if (requestParameters.getLeaseId() != null) { | |||
requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, requestParameters.getLeaseId())); | |||
} | |||
if (isChecksumValidationEnabled()) { | |||
addCheckSumHeaderForWrite(requestHeaders, requestParameters); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: formatting required - there must be one tab in the start
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taken
@@ -1032,7 +1036,7 @@ public AbfsRestOperation flush(final String path, | |||
final String cachedSasToken, | |||
final String leaseId, | |||
final ContextEncryptionAdapter contextEncryptionAdapter, | |||
final TracingContext tracingContext) throws AzureBlobFileSystemException { | |||
final TracingContext tracingContext, String blobMd5) throws AzureBlobFileSystemException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add new argument in the comments @param. Please make this change wherever required.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taken
@@ -1060,7 +1064,7 @@ public AbfsRestOperation flush(byte[] buffer, | |||
final String leaseId, | |||
final String eTag, | |||
ContextEncryptionAdapter contextEncryptionAdapter, | |||
final TracingContext tracingContext) throws AzureBlobFileSystemException { | |||
final TracingContext tracingContext, String blobMd5) throws AzureBlobFileSystemException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taken
* @param tracingContext for tracing the server calls. | ||
* @return executed rest operation containing response from server. | ||
* @throws AzureBlobFileSystemException if rest operation fails. | ||
* Flushes previously uploaded data to the specified path. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT - Format can be consistent across places.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Taken
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
AbfsOutputStream out; | ||
out = Mockito.spy(new AbfsOutputStream( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we have this in the same line?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taken
@@ -42,10 +51,28 @@ | |||
* Test compatibility between ABFS client and WASB client. | |||
*/ | |||
public class ITestWasbAbfsCompatibility extends AbstractAbfsIntegrationTest { | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: we can remove these spaces here
return new String(Base64.encodeBase64(blockIdByteArray), StandardCharsets.UTF_8); | ||
UUID streamIdGuid = UUID.nameUUIDFromBytes(streamId.getBytes(StandardCharsets.UTF_8)); | ||
long blockIndex = os.getBlockManager().getBlockCount(); | ||
String rawBlockId = String.format("%s-%06d", streamIdGuid, blockIndex); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can use the constant BLOCK_ID_FORMAT
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added review for the test code
* @return The Base64-encoded MD5 checksum of the specified data, or null if the digest is empty. | ||
* @throws IllegalArgumentException If the offset or length is invalid for the given byte array. | ||
*/ | ||
public String getMd5(byte[] data, int off, int length) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A similar method is present in production code.
Can we use that itself in tests so that production method can also be covered in test flow?
If some issuei is found later then someone might just fix here and production code will still remain buggy.
* @return String representing the block ID generated. | ||
*/ | ||
private String generateBlockId(AbfsOutputStream os, long position) { | ||
private String generateBlockId(AbfsOutputStream os) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better to use production code in tests as well.
Any issue in code better to catch in production and fix there itself.
* @return A Base64-encoded string representing the MD5 hash of the full blob content, | ||
* or {@code null} if the digest could not be computed. | ||
*/ | ||
protected String computeFullBlobMd5() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a unit test adde around this?
getAbfsOutputStream().getPath(), offset, ex); | ||
throw ex; | ||
} finally { | ||
getAbfsOutputStream().getFullBlobContentMd5().reset(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a test verifying reset happens even in case of exception?
@@ -1914,7 +1920,11 @@ private List<AbfsHttpHeader> getMetadataHeadersList(final Hashtable<String, Stri | |||
// AzureBlobFileSystem supports only ASCII Characters in property values. | |||
if (isPureASCII(value)) { | |||
try { | |||
value = encodeMetadataAttribute(value); | |||
// URL encoding this JSON metadata, set by the WASB Client during file creation, causes compatibility issues. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion to add test around this if already not added
|
||
// Scenario wise testing | ||
|
||
//Scenario 1: - Create and write via WASB, read via ABFS |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we convert all these comments to javadocs for test methods also inlcuding the expected outcome of eac scenario
try { | ||
abfs.create(path, false); | ||
} catch (Exception e) { | ||
assertTrue(e.getMessage().contains("AlreadyExists")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can add assert on exception type and status code as well.
assertEquals("Wrong text from " + abfs, | ||
TEST_CONTEXT, line); | ||
} | ||
wasb.create(path, true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we assert that after creation file lenght is 0
|
||
// Write | ||
try (FSDataOutputStream nativeFsStream = abfs.create(path, true)) { | ||
nativeFsStream.write(TEST_CONTEXT.getBytes()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Scenario name says we need to write via ABFS. Seems like we are writing via wasb
// Write | ||
wasb.create(path, true); | ||
try (FSDataOutputStream nativeFsStream = abfs.append(path)) { | ||
nativeFsStream.write(TEST_CONTEXT.getBytes()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
variable name for stream sounds confusing with wasb
* @return Total number of files and directories. | ||
* @throws IOException If an error occurs while accessing the file system. | ||
*/ | ||
public static int listAllFilesAndDirs(FileSystem fs, Path path) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we make it private?
Jira :- https://issues.apache.org/jira/browse/HADOOP-19604
BlockId computation to be consistent across clients for PutBlock and PutBlockList so made use of blockCount instead of offset.
Block IDs were previously derived from the data offset, which could lead to inconsistency across different clients. The change now uses blockCount (i.e., the index of the block) to compute the Block ID, ensuring deterministic and consistent ID generation for both PutBlock and PutBlockList operations across clients.
Restrict URL encoding of certain JSON metadata during setXAttr calls.
When setting extended attributes (xAttrs), the JSON metadata (hdi_permission) was previously URL-encoded, which could cause unnecessary escaping or compatibility issues. This change ensures that only required metadata are encoded.
Maintain the MD5 hash of the whole block to validate data integrity during flush.
During flush operations, the MD5 hash of the entire block's data is computed and stored. This hash is later used to validate that the block correctly persisted, ensuring data integrity and helping detect corruption or transmission errors.