-
Notifications
You must be signed in to change notification settings - Fork 979
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DRILL-4706: Fragment planning causes Drillbits to read remote chunks when local copies are available. #639
base: master
Are you sure you want to change the base?
Conversation
Updated the JIRA with details on how current algorithm works, why remote reads were happening and the new algorithm details. |
@vkorukanti if you don't mind, can you review this? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yet to review "LocalAffinityFragmentParallelizer.java"
@@ -75,6 +78,7 @@ public EndpointAffinity(final DrillbitEndpoint endpoint, final double affinity, | |||
this.affinity = affinity; | |||
this.mandatory = mandatory; | |||
this.maxWidth = maxWidth; | |||
this.numLocalWorkUnits = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not needed. By default it will always be initialized to 0
@@ -530,6 +534,7 @@ public RowGroupInfo(@JsonProperty("path") String path, @JsonProperty("start") lo | |||
this.rowGroupIndex = rowGroupIndex; | |||
this.rowCount = rowCount; | |||
this.numRecordsToRead = rowCount; | |||
this.preferredEndpoint = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not required.
} | ||
|
||
// Get the list of endpoints which have maximum (equal) data. | ||
List<DrillbitEndpoint> topEndpoints = endpointByteMap.getTopEndpoints(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It took me a while to understand the below algorithm just by reading code. It will be helpful if we can name the variables better here and add some comment explaining different sections. Like changing as below might help:
- "topEndPoints" to "maxRGDataEndPoints",
- "minBytes" to "assignedBytesOnPickedNode"
- "numBytes" to "assignedBytesOnCurrEndpoint"
- "endpoint" to "currEndpoint"
As per my understanding line 864 to 892 represents one section which has the below logic:
- For each row group assign a drillbit from topEndPoints list such that the chosen one is least loaded in terms of workunits.
if (numEndpointAssignments.containsKey(endpoint)) { | ||
epAff.setNumLocalWorkUnits(numEndpointAssignments.get(endpoint)); | ||
} else { | ||
epAff.setNumLocalWorkUnits(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"else" condition is not required since by default it will be set to 0
epAff.setNumLocalWorkUnits(0); | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove extra space. Please review other places as well.
} | ||
|
||
Integer assignment = iteratorWrapper.iter.next(); | ||
iteratorWrapper.count++; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here shouldn't we check if the "iteratorWrapper.count" is exceeding the "iteratorWrapper.maxCount" ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some initial comments.
The issue is regarding assigning fragments based on strict locality. So why is the parallelization logic affected, and not exclusively locality?
Please add unit tests; see TestHardAffinityFragmentParallelizer. Examples would simply understanding this code.
@@ -145,6 +145,11 @@ public EndpointByteMap getByteMap() { | |||
public int compareTo(CompleteWork o) { | |||
return 0; | |||
} | |||
|
|||
@Override | |||
public DrillbitEndpoint getPreferredEndpoint() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a TODO here?
// 6: Finally make sure the width is at least one | ||
width = Math.max(1, width); | ||
|
||
List<DrillbitEndpoint> endpointPool = Lists.newArrayList(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final (and wherever possible, generously)
while(totalAssigned < width) { | ||
int assignedThisRound = 0; | ||
for (DrillbitEndpoint ep : endpointPool) { | ||
if (remainingEndpointAssignments.get(ep) > 0 && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
get value once into local var (and reuse)
|
||
// This is for the case where drillbits are not running on endPoints which have data. | ||
// Allocate them from the active endpoint pool. | ||
int totalUnAssigned = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this parallelizer is not strictly local? Why not fail?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I got all unit and regression tests pass with localAffinity=true. If this algorithm fails, that is not possible. Also, we are doing this only for the case when drillbits are not running on the nodes which have data.
Map<DrillbitEndpoint, Long> numAssignedBytes = Maps.newHashMap(); | ||
|
||
// Do this for 2 iterations to adjust node assignments after first iteration. | ||
int numIterartions = 2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Iterartions -> iterations
* This is for Parquet Scan Fragments only. Fragment placement is done preferring strict | ||
* data locality. | ||
*/ | ||
public class LocalAffinityFragmentParallelizer implements FragmentParallelizer { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When to use this vs HardAffinityFragmentParallelizer?
Updated with all review comments taken care of. Added TestLocalAffinityFragmentParallelizer.java which has bunch of test cases with examples. |
Some initial comments. The issue is regarding assigning fragments based on strict locality. So why is the parallelization logic affected, and not exclusively locality? Parallelization logic is affected because it decides how many fragments to run on each node and that is dependent on locality. |
Hmm the answer seems like a rephrasing of the question. Sorry, I misspoke. Better asked: The issue is regarding assigning work to fragments based on strict locality (decide which fragment does what). So why is the parallelization (decide how many fragments) logic affected? |
Parallelization logic is affected for following reasons: |
DEP3, 8, | ||
DEP4, 8, | ||
DEP5, 8); | ||
// Expect the fragment parallelization to be 80 (16 * 5) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wrong comment. Should be 40
} | ||
|
||
// Keep allocating from endpoints in a round robin fashion upto | ||
// max(targetAllocation, maxwidthPerNode) for each endpoint and |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wrong comment.. We assign until we reach limit of maxWidthPerNode
40 /* globalMaxWidth */), | ||
ImmutableList.of(DEP1, DEP2, DEP3, DEP4, DEP5)); | ||
// The parallelization maxWidth (80) is more than globalMaxWidth(40). | ||
// Expect the fragment parallelization to be 40 (7 + 8 + 8 + 8 + 9) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be great to mention that DEP5 is getting 9 fragment instead of DEP4 since that has more localWorkUnits. We do favor nodes with more localWorkUnit.
…when local copies are available. New fragment placement algorithm based on locality of data.
Merged with latest code. All review comments taken care of. All tests pass with the option |
@ppadma was this merged? I don't see a |
Even though it is old, this PR is still very much relevant and useful feature to have in Drill for certain use cases/scenarios. |
No description provided.