Skip to content

Commit 38e9a73

Browse files
committed
Addressing Review comments.
1 parent 1517a87 commit 38e9a73

20 files changed

+389
-480
lines changed

exec/java-exec/src/main/java/org/apache/drill/common/DrillNode.java

+32-11
Original file line numberDiff line numberDiff line change
@@ -35,17 +35,38 @@ public static DrillNode create(DrillbitEndpoint endpoint) {
3535
return new DrillNode(endpoint);
3636
}
3737

38-
public boolean equals(Object other) {
39-
if (!(other instanceof DrillNode)) {
40-
return false;
38+
public boolean equals(Object obj) {
39+
if (obj == this) {
40+
return true;
4141
}
42+
if (!(obj instanceof org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint)) {
43+
return super.equals(obj);
44+
}
45+
org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint other = (org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint) obj;
4246

43-
DrillbitEndpoint otherEndpoint = ((DrillNode) other).endpoint;
44-
return endpoint.getAddress().equals(otherEndpoint.getAddress()) &&
45-
endpoint.getUserPort() == otherEndpoint.getUserPort() &&
46-
endpoint.getControlPort() == otherEndpoint.getControlPort() &&
47-
endpoint.getDataPort() == otherEndpoint.getDataPort() &&
48-
endpoint.getVersion().equals(otherEndpoint.getVersion());
47+
boolean result = true;
48+
result = result && (endpoint.hasAddress() == other.hasAddress());
49+
if (endpoint.hasAddress()) {
50+
result = result && endpoint.getAddress()
51+
.equals(other.getAddress());
52+
}
53+
result = result && (endpoint.hasUserPort() == other.hasUserPort());
54+
if (endpoint.hasUserPort()) {
55+
result = result && (endpoint.getUserPort() == other.getUserPort());
56+
}
57+
result = result && (endpoint.hasControlPort() == other.hasControlPort());
58+
if (endpoint.hasControlPort()) {
59+
result = result && (endpoint.getControlPort() == other.getControlPort());
60+
}
61+
result = result && (endpoint.hasDataPort() == other.hasDataPort());
62+
if (endpoint.hasDataPort()) {
63+
result = result && (endpoint.getDataPort() == other.getDataPort());
64+
}
65+
result = result && (endpoint.hasVersion() == other.hasVersion());
66+
if (endpoint.hasVersion()) {
67+
result = result && endpoint.getVersion().equals(other.getVersion());
68+
}
69+
return result;
4970
}
5071

5172
@Override
@@ -81,8 +102,8 @@ public String toString() {
81102
StringBuilder sb = new StringBuilder();
82103

83104
return sb.append("endpoint address :")
84-
.append(endpoint.getAddress())
105+
.append(endpoint.hasAddress() ? endpoint.getAddress() : "no-address")
85106
.append("endpoint user port: ")
86-
.append(endpoint.getUserPort()).toString();
107+
.append(endpoint.hasUserPort() ? endpoint.getUserPort() : "no-userport").toString();
87108
}
88109
}

exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ public OperatorStats(OperatorStats original, boolean isClean) {
8888
}
8989

9090
@VisibleForTesting
91-
public OperatorStats(int operatorId, int operatorType, int inputCount, BufferAllocator allocator, long initialAllocation) {
91+
public OperatorStats(int operatorId, int operatorType, int inputCount, BufferAllocator allocator, long optimalMemAllocation) {
9292
super();
9393
this.allocator = allocator;
9494
this.operatorId = operatorId;
@@ -97,7 +97,7 @@ public OperatorStats(int operatorId, int operatorType, int inputCount, BufferAll
9797
this.recordsReceivedByInput = new long[inputCount];
9898
this.batchesReceivedByInput = new long[inputCount];
9999
this.schemaCountByInput = new long[inputCount];
100-
this.optimalMemoryAllocation = initialAllocation;
100+
this.optimalMemoryAllocation = optimalMemAllocation;
101101
}
102102

103103
private String assertionError(String msg){
@@ -208,7 +208,7 @@ public OperatorProfile getProfile() {
208208
.setOperatorId(operatorId) //
209209
.setSetupNanos(setupNanos) //
210210
.setProcessNanos(processingNanos)
211-
.setOptimalMemAllocation(optimalMemoryAllocation)
211+
.setMaxAllocation(optimalMemoryAllocation)
212212
.setWaitNanos(waitNanos);
213213

214214
if (allocator != null) {

exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/DistributedQueueParallelizer.java

+9-11
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@
4747
* fragment is based on the cluster state and provided queue configuration.
4848
*/
4949
public class DistributedQueueParallelizer extends SimpleParallelizer {
50-
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DistributedQueueParallelizer.class);
50+
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DistributedQueueParallelizer.class);
5151
private final boolean planHasMemory;
5252
private final QueryContext queryContext;
5353
private final QueryResourceManager rm;
@@ -62,23 +62,21 @@ public DistributedQueueParallelizer(boolean memoryPlanning, QueryContext queryCo
6262
}
6363

6464
// return the memory computed for a physical operator on a drillbitendpoint.
65+
// At this stage buffered operator memory could have been reduced depending upon
66+
// the selected queue limits.
6567
public BiFunction<DrillbitEndpoint, PhysicalOperator, Long> getMemory() {
6668
return (endpoint, operator) -> {
69+
long operatorsMemory = operator.getMaxAllocation();
6770
if (!planHasMemory) {
68-
final DrillNode drillEndpointNode = DrillNode.create(endpoint);
6971
if (operator.isBufferedOperator(queryContext)) {
70-
Long operatorsMemory = operators.get(drillEndpointNode).get(operator);
71-
logger.debug(" Memory requirement for the operator {} in endpoint {} is {}", operator, endpoint, operatorsMemory);
72-
return operatorsMemory;
72+
final DrillNode drillEndpointNode = DrillNode.create(endpoint);
73+
operatorsMemory = operators.get(drillEndpointNode).get(operator);
7374
} else {
74-
Long nonBufferedMemory = (long)operator.getCost().getMemoryCost();
75-
logger.debug(" Memory requirement for the operator {} in endpoint {} is {}", operator, endpoint, nonBufferedMemory);
76-
return nonBufferedMemory;
75+
operatorsMemory = (long)operator.getCost().getMemoryCost();
7776
}
7877
}
79-
else {
80-
return operator.getMaxAllocation();
81-
}
78+
logger.debug(" Memory requirement for the operator {} in endpoint {} is {}", operator, endpoint, operatorsMemory);
79+
return operatorsMemory;
8280
};
8381
}
8482

exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java

+2-18
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@
1818
package org.apache.drill.exec.planner.fragment;
1919

2020
import org.apache.drill.exec.ops.QueryContext;
21-
import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
2221
import org.apache.drill.exec.physical.base.Exchange;
2322
import org.apache.drill.exec.physical.base.PhysicalOperator;
23+
import org.apache.drill.exec.util.memory.MemoryAllocationUtilities;
2424
import org.apache.drill.exec.work.foreman.ForemanSetupException;
2525
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
2626

@@ -167,27 +167,11 @@ public boolean equals(Object obj) {
167167

168168
public List<PhysicalOperator> getBufferedOperators(QueryContext queryContext) {
169169
List<PhysicalOperator> bufferedOps = new ArrayList<>();
170-
root.accept(new BufferedOpFinder(queryContext), bufferedOps);
170+
root.accept(new MemoryAllocationUtilities.BufferedOpFinder(queryContext), bufferedOps);
171171
return bufferedOps;
172172
}
173173

174-
protected static class BufferedOpFinder extends AbstractPhysicalVisitor<Void, List<PhysicalOperator>, RuntimeException> {
175-
private final QueryContext context;
176174

177-
public BufferedOpFinder(QueryContext queryContext) {
178-
this.context = queryContext;
179-
}
180-
181-
@Override
182-
public Void visitOp(PhysicalOperator op, List<PhysicalOperator> value)
183-
throws RuntimeException {
184-
if (op.isBufferedOperator(context)) {
185-
value.add(op);
186-
}
187-
visitChildren(op, value);
188-
return null;
189-
}
190-
}
191175

192176
@Override
193177
public String toString() {

exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MemoryCalculator.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -49,13 +49,13 @@ public class MemoryCalculator extends AbstractOpWrapperVisitor<Void, RuntimeExce
4949
// List of all the buffered operators and their memory requirement per drillbit.
5050
private final Map<DrillNode, List<Pair<PhysicalOperator, Long>>> bufferedOperators;
5151
private final QueryContext queryContext;
52-
private final long MINIMUM_MEMORY_FOR_BUFFER_OPERS;
52+
private final long minimum_memory_for_buffer_opers;
5353

5454
public MemoryCalculator(PlanningSet planningSet, QueryContext context, long minMemory) {
5555
this.planningSet = planningSet;
5656
this.bufferedOperators = new HashMap<>();
5757
this.queryContext = context;
58-
this.MINIMUM_MEMORY_FOR_BUFFER_OPERS = minMemory;
58+
this.minimum_memory_for_buffer_opers = minMemory;
5959
}
6060

6161
// Helper method to compute the minor fragment count per drillbit. This method returns
@@ -138,7 +138,7 @@ public Void visitOp(PhysicalOperator op, Wrapper fragment) {
138138
// The memory estimates of the optimizer are for the whole operator spread across all the
139139
// minor fragments. Divide this memory estimation by fragment width to get the memory
140140
// requirement per minor fragment.
141-
long memoryCostPerMinorFrag = Math.max((long)Math.ceil(memoryCost/fragment.getAssignedEndpoints().size()), MINIMUM_MEMORY_FOR_BUFFER_OPERS);
141+
long memoryCostPerMinorFrag = Math.max((long)Math.ceil(memoryCost/fragment.getAssignedEndpoints().size()), minimum_memory_for_buffer_opers);
142142
Map<DrillNode, Integer> drillbitEndpointMinorFragMap = getMinorFragCountPerDrillbit(fragment);
143143

144144
Map<DrillNode,

exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,7 @@ protected QueryWorkUnit generateWorkUnit(OptionList options, DrillbitEndpoint fo
338338
PlanFragment fragment = PlanFragment.newBuilder()
339339
.setForeman(foremanNode)
340340
.setHandle(handle)
341-
.setEndpointUUID(endpointUUID)
341+
.setAssignedEndpointUUID(endpointUUID)
342342
.setAssignment(endpoint)
343343
.setLeafFragment(isLeafFragment)
344344
.setContext(queryContextInfo)

exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/ZKQueueParallelizer.java

+12-8
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.drill.exec.planner.fragment;
1919

20+
import org.apache.drill.common.DrillNode;
2021
import org.apache.drill.common.util.function.CheckedConsumer;
2122
import org.apache.drill.exec.ops.QueryContext;
2223
import org.apache.drill.exec.physical.PhysicalOperatorSetupException;
@@ -26,9 +27,10 @@
2627
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
2728
import org.apache.drill.exec.util.memory.ZKQueueMemoryAllocationUtilities;
2829
import org.apache.drill.exec.work.foreman.rm.QueryResourceManager;
29-
import org.apache.drill.shaded.guava.com.google.common.collect.ArrayListMultimap;
30-
import org.apache.drill.shaded.guava.com.google.common.collect.Multimap;
30+
31+
import java.util.ArrayList;
3132
import java.util.Collection;
33+
import java.util.List;
3234
import java.util.Map;
3335
import java.util.HashMap;
3436
import java.util.Set;
@@ -69,15 +71,17 @@ public void adjustMemory(PlanningSet planningSet, Set<Wrapper> roots,
6971

7072

7173
public class Collector extends AbstractOpWrapperVisitor<Void, RuntimeException> {
72-
private final Multimap<DrillbitEndpoint, PhysicalOperator> bufferedOperators;
74+
private final Map<DrillNode, List<PhysicalOperator>> bufferedOperators;
7375

7476
public Collector() {
75-
this.bufferedOperators = ArrayListMultimap.create();
77+
this.bufferedOperators = new HashMap<>();
7678
}
7779

7880
private void getMinorFragCountPerDrillbit(Wrapper currFragment, PhysicalOperator operator) {
7981
for (DrillbitEndpoint endpoint : currFragment.getAssignedEndpoints()) {
80-
bufferedOperators.put(endpoint, operator);
82+
DrillNode node = new DrillNode(endpoint);
83+
bufferedOperators.putIfAbsent(node, new ArrayList<>());
84+
bufferedOperators.get(node).add(operator);
8185
}
8286
}
8387

@@ -103,10 +107,10 @@ public Void visitOp(PhysicalOperator op, Wrapper fragment) {
103107
}
104108

105109
public Map<String, Collection<PhysicalOperator>> getNodeMap() {
106-
Map<DrillbitEndpoint, Collection<PhysicalOperator>> endpointCollectionMap = bufferedOperators.asMap();
110+
Map<DrillNode, List<PhysicalOperator>> endpointCollectionMap = bufferedOperators;
107111
Map<String, Collection<PhysicalOperator>> nodeMap = new HashMap<>();
108-
for (Map.Entry<DrillbitEndpoint, Collection<PhysicalOperator>> entry : endpointCollectionMap.entrySet()) {
109-
nodeMap.put(entry.getKey().getAddress(), entry.getValue());
112+
for (Map.Entry<DrillNode, List<PhysicalOperator>> entry : endpointCollectionMap.entrySet()) {
113+
nodeMap.put(entry.getKey().toString(), entry.getValue());
110114
}
111115

112116
return nodeMap;

exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/NodeResources.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -44,18 +44,18 @@ public class NodeResources {
4444

4545
private long memoryInBytes;
4646

47-
private long numVirtualCpu;
47+
private int numVirtualCpu;
4848

4949
private static final int CURRENT_VERSION = 1;
5050

51-
public NodeResources(long memoryInBytes, long numVirtualCpu) {
51+
public NodeResources(long memoryInBytes, int numVirtualCpu) {
5252
this(CURRENT_VERSION, memoryInBytes, numVirtualCpu);
5353
}
5454

5555
@JsonCreator
5656
public NodeResources(@JsonProperty("version") int version,
5757
@JsonProperty("memoryInBytes") long memoryInBytes,
58-
@JsonProperty("numVirtualCpu") long numVirtualCpu) {
58+
@JsonProperty("numVirtualCpu") int numVirtualCpu) {
5959
this.version = version;
6060
this.memoryInBytes = memoryInBytes;
6161
this.numVirtualCpu = numVirtualCpu;
@@ -79,7 +79,7 @@ public long getMemoryInGB() {
7979
return Math.round(getMemoryInMB() / 1024L);
8080
}
8181

82-
public long getNumVirtualCpu() {
82+
public int getNumVirtualCpu() {
8383
return numVirtualCpu;
8484
}
8585

@@ -140,11 +140,11 @@ public static NodeResources create() {
140140
return create(0,0);
141141
}
142142

143-
public static NodeResources create(long cpu) {
143+
public static NodeResources create(int cpu) {
144144
return create(cpu,0);
145145
}
146146

147-
public static NodeResources create(long cpu, long memory) {
147+
public static NodeResources create(int cpu, long memory) {
148148
return new NodeResources(CURRENT_VERSION, memory, cpu);
149149
}
150150

0 commit comments

Comments
 (0)