-
Notifications
You must be signed in to change notification settings - Fork 9.1k
HADOOP-19645. [ABFS][ReadAheadV2] Improve Metrics for Read Calls to identify type of read done. #7837
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
💔 -1 overall
This message was automatically generated. |
:::: AGGREGATED TEST RESULT :::: ============================================================
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds metrics to identify different types of read operations in the ABFS driver by enhancing the tracing header with operation-specific information. The main goal is to differentiate between various read types (direct, normal, prefetch, cache miss, footer, and small file reads) through the ClientRequestId header.
Key changes include:
- Adding a ReadType enum to categorize different read operations
- Updating the tracing header format to include versioning and operation-specific headers
- Modifying read operations throughout the codebase to set appropriate ReadType values
Reviewed Changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 5 comments.
Show a summary per file
File | Description |
---|---|
ReadType.java | New enum defining six read operation types with abbreviated string representations |
AbfsHttpConstants.java | Added TracingHeaderVersion enum for header versioning |
TracingContext.java | Enhanced header construction with versioning and operation-specific headers |
Listener.java | Added interface method for updating ReadType |
AbfsInputStream.java | Updated read operations to set appropriate ReadType values |
ReadBufferWorker.java | Added imports for ReadType and TracingContext |
TracingHeaderValidator.java | Updated validation logic for new header format |
TestApacheHttpClientFallback.java | Fixed test assertions for new header structure |
TestTracingContext.java | Updated header parsing for new format |
header += (":" + operatedBlobCount); | ||
} | ||
header += (":" + httpOperation.getTracingContextSuffix()); | ||
AbfsHttpConstants.TracingHeaderVersion.V1 + ":" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The hardcoded V1 version is used in multiple places. Consider using TracingHeaderVersion.getCurrentVersion() consistently to centralize version management.
Copilot uses AI. Check for mistakes.
AbfsHttpConstants.TracingHeaderVersion.V1 + ":" + | ||
clientCorrelationID + ":" + clientRequestId; | ||
metricHeader += !(metricResults.trim().isEmpty()) ? metricResults : ""; | ||
break; | ||
default: | ||
//case SINGLE_ID_FORMAT | ||
header = clientRequestId; | ||
header = | ||
AbfsHttpConstants.TracingHeaderVersion.V1 + ":" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The hardcoded V1 version is used in multiple places. Consider using TracingHeaderVersion.getCurrentVersion() consistently to centralize version management.
Copilot uses AI. Check for mistakes.
AbfsHttpConstants.TracingHeaderVersion.V1 + ":" + | ||
clientCorrelationID + ":" + clientRequestId; | ||
metricHeader += !(metricResults.trim().isEmpty()) ? metricResults : ""; | ||
break; | ||
default: | ||
//case SINGLE_ID_FORMAT | ||
header = clientRequestId; | ||
header = | ||
AbfsHttpConstants.TracingHeaderVersion.V1 + ":" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The hardcoded V1 version is used in multiple places. Consider using TracingHeaderVersion.getCurrentVersion() consistently to centralize version management.
Copilot uses AI. Check for mistakes.
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java
Show resolved
Hide resolved
@@ -81,82 +85,93 @@ public TracingHeaderValidator(String clientCorrelationId, String fileSystemId, | |||
} | |||
|
|||
private void validateTracingHeader(String tracingContextHeader) { | |||
String[] idList = tracingContextHeader.split(":"); | |||
String[] idList = tracingContextHeader.split(":", -1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] Consider defining the split limit (-1) as a named constant to improve code readability and maintainability.
String[] idList = tracingContextHeader.split(":", -1); | |
String[] idList = tracingContextHeader.split(":", SPLIT_NO_LIMIT); |
Copilot uses AI. Check for mistakes.
💔 -1 overall
This message was automatically generated. |
@@ -128,6 +128,7 @@ public final class AbfsHttpConstants { | |||
public static final String STAR = "*"; | |||
public static final String COMMA = ","; | |||
public static final String COLON = ":"; | |||
public static final String HYPHEN = "-"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We already have CHAR_HYPHEN defined for this.
@@ -265,6 +286,34 @@ private String addFailureReasons(final String header, | |||
return String.format("%s_%s", header, previousFailure); | |||
} | |||
|
|||
private String getRetryHeader(final String previousFailure, String retryPolicyAbbreviation) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add javadoc to all newly added methods
} | ||
|
||
public int getFieldCount() { | ||
return V1.fieldCount; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't it be just return this.fieldCount
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
} | ||
|
||
public String getVersion() { | ||
return V1.version; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above, it should be return this.version
?
@@ -781,6 +794,132 @@ public void testDefaultReadaheadQueueDepth() throws Exception { | |||
in.close(); | |||
} | |||
|
|||
@Test | |||
public void testReadTypeInTracingContextHeader() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Java Doc missing
🎊 +1 overall
This message was automatically generated. |
receivedBytes = readRemote(position, b, offset, length, new TracingContext(tracingContext)); | ||
TracingContext tc = new TracingContext(tracingContext); | ||
tc.setReadType(ReadType.MISSEDCACHE_READ); | ||
receivedBytes = readRemote(position, b, offset, length, tc); | ||
return receivedBytes; | ||
} else { | ||
LOG.debug("read ahead disabled, reading remote"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add readtype as normal read for this TC as well?
@@ -442,6 +451,7 @@ private int optimisedRead(final byte[] b, final int off, final int len, | |||
// bCursor that means the user requested data has not been read. | |||
if (fCursor < contentLength && bCursor > limit) { | |||
restorePointerState(); | |||
tracingContext.setReadType(ReadType.NORMAL_READ); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before readOneBlock we're setting TC as normal read both here and line 439. In readOneBlock method- we're setting TC again to normal read- do we need it twice?
We can keep it once in the method only otherwise
+ position + COLON | ||
+ operatedBlobCount + COLON | ||
+ httpOperation.getTracingContextSuffix() + COLON | ||
+ getOperationSpecificHeader(opType); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we keep the op specific header before adding the HTTP client? It would get all req related info together and then network client.
Eg- .....:RE:1_EGR:NR:JDK
@@ -265,6 +286,34 @@ private String addFailureReasons(final String header, | |||
return String.format("%s_%s", header, previousFailure); | |||
} | |||
|
|||
private String getRetryHeader(final String previousFailure, String retryPolicyAbbreviation) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can remove the addFailureReasons method- it has no usage now
public enum TracingHeaderVersion { | ||
|
||
V0("", 8), | ||
V1("v1", 13); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the next versions would be V1.1/V1.2- so should we consider starting with V1.0/V1.1?
And with the version updates- would we update the version field in V1 only or new V1.1 enum?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So every time we add a new header, we need to add a new version ?
.contains(readType.toString()); | ||
} | ||
|
||
// private testReadTypeInTracingContextHeaderInternal(ReadType readType) throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit- we can remove this
@@ -77,8 +81,7 @@ public class TracingContext { | |||
* this field shall not be set. | |||
*/ | |||
private String primaryRequestIdForRetry; | |||
|
|||
private Integer operatedBlobCount = null; | |||
private Integer operatedBlobCount = 1; // Only relevant for rename-delete over blob endpoint where it will be explicitly set. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is it changed from null to 1 ?
} | ||
|
||
public static TracingHeaderVersion getCurrentVersion() { | ||
return V1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this needs to be updated everytime a new version is introduced, can it be dynamically fetched ?
header += (":" + httpOperation.getTracingContextSuffix()); | ||
metricHeader += !(metricResults.trim().isEmpty()) ? metricResults : ""; | ||
case ALL_ID_FORMAT: | ||
header = TracingHeaderVersion.V1.getVersion() + COLON |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we use getCurrentVersion here ?
+ streamID + COLON | ||
+ opType + COLON | ||
+ getRetryHeader(previousFailure, retryPolicyAbbreviation) + COLON | ||
+ ingressHandler + COLON |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these empty string checks are needed
break; | ||
case TWO_ID_FORMAT: | ||
header = clientCorrelationID + ":" + clientRequestId; | ||
metricHeader += !(metricResults.trim().isEmpty()) ? metricResults : ""; | ||
header = TracingHeaderVersion.V1.getVersion() + COLON |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above getCurrentVersion ?
@@ -326,8 +329,8 @@ fileSystemId, FSOperationType.CREATE_FILESYSTEM, tracingHeaderFormat, new Tracin | |||
} | |||
|
|||
private void checkHeaderForRetryPolicyAbbreviation(String header, String expectedFailureReason, String expectedRetryPolicyAbbreviation) { | |||
String[] headerContents = header.split(":"); | |||
String previousReqContext = headerContents[6]; | |||
String[] headerContents = header.split(":", SPLIT_NO_LIMIT); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
colon constant here as well since we are changing at other places
numOfReadCalls += 3; // 3 blocks of 1MB each. | ||
doReturn(false).when(spiedConfig).isReadAheadV2Enabled(); | ||
doReturn(false).when(spiedConfig).isReadAheadEnabled(); | ||
testReadTypeInTracingContextHeaderInternal(spiedFs, fileSize, NORMAL_READ, numOfReadCalls); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we also verify that it is normal_read for all the three calls made, currently it verifies for contains
numOfReadCalls += 3; | ||
doReturn(true).when(spiedConfig).isReadAheadEnabled(); | ||
Mockito.doReturn(3).when(spiedConfig).getReadAheadQueueDepth(); | ||
testReadTypeInTracingContextHeaderInternal(spiedFs, fileSize, PREFETCH_READ, numOfReadCalls); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here verify that 2 calls have prefetch_read
doReturn(true).when(spiedConfig).readSmallFilesCompletely(); | ||
doReturn(false).when(spiedConfig).optimizeFooterRead(); | ||
testReadTypeInTracingContextHeaderInternal(spiedFs, fileSize, SMALLFILE_READ, numOfReadCalls); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One test for direct read as well ?
Description of PR
JIRA: https://issues.apache.org/jira/browse/HADOOP-19645
There are a number of ways in which ABFS driver can trigger a network call to read data. We need a way to identify what type of read call was made from client. Plan is to add an indication for this in already present ClientRequestId header.
Following are types of read we want to identify:
We will add another field in the Tracing Header (Client Request Id) for each request. We can call this field "Operation Specific Header" very similar to how we have "Retry Header" today. As part of this we will only use it for read operations keeping it empty for other operations. Moving ahead f we need to publish any operation specific info, same header can be used.
How was this patch tested?
New tests around changes in Tracing Header and intoduction of read specific header added.
Existing test suite ran across all combinations. Results added as comment