18
18
package org .apache .drill .exec .planner .fragment ;
19
19
20
20
import org .apache .commons .lang3 .tuple .Pair ;
21
+ import org .apache .drill .common .exceptions .ExecutionSetupException ;
21
22
import org .apache .drill .common .util .function .CheckedConsumer ;
22
23
import org .apache .drill .exec .ops .QueryContext ;
23
24
import org .apache .drill .exec .physical .PhysicalOperatorSetupException ;
24
25
import org .apache .drill .exec .physical .base .PhysicalOperator ;
25
- import org .apache .drill .exec .planner .cost .NodeResource ;
26
26
import org .apache .drill .exec .proto .CoordinationProtos .DrillbitEndpoint ;
27
+ import org .apache .drill .exec .resourcemgr .NodeResources ;
28
+ import org .apache .drill .exec .resourcemgr .config .QueryQueueConfig ;
29
+ import org .apache .drill .exec .resourcemgr .config .exception .QueueSelectionException ;
30
+ import org .apache .drill .exec .work .foreman .rm .QueryResourceManager ;
27
31
28
32
import java .util .Map ;
29
33
import java .util .HashMap ;
43
47
public class DistributedQueueParallelizer extends SimpleParallelizer {
44
48
private final boolean planHasMemory ;
45
49
private final QueryContext queryContext ;
50
+ private final QueryResourceManager rm ;
46
51
private final Map <DrillbitEndpoint , Map <PhysicalOperator , Long >> operators ;
47
52
48
- public DistributedQueueParallelizer (boolean memoryPlanning , QueryContext queryContext ) {
53
+ public DistributedQueueParallelizer (boolean memoryPlanning , QueryContext queryContext , QueryResourceManager queryRM ) {
49
54
super (queryContext );
50
55
this .planHasMemory = memoryPlanning ;
51
56
this .queryContext = queryContext ;
57
+ this .rm = queryRM ;
52
58
this .operators = new HashMap <>();
53
59
}
54
60
@@ -75,45 +81,75 @@ public BiFunction<DrillbitEndpoint, PhysicalOperator, Long> getMemory() {
75
81
*
76
82
* @param planningSet context of the fragments.
77
83
* @param roots root fragments.
78
- * @param activeEndpoints currently active endpoints.
84
+ * @param onlineEndpointUUIDs currently active endpoints.
79
85
* @throws PhysicalOperatorSetupException
80
86
*/
81
87
public void adjustMemory (PlanningSet planningSet , Set <Wrapper > roots ,
82
- Collection <DrillbitEndpoint > activeEndpoints ) throws PhysicalOperatorSetupException {
88
+ Map <DrillbitEndpoint , String > onlineEndpointUUIDs ) throws ExecutionSetupException {
83
89
84
90
if (planHasMemory ) {
85
91
return ;
86
92
}
87
93
// total node resources for the query plan maintained per drillbit.
88
- final Map <DrillbitEndpoint , NodeResource > totalNodeResources =
89
- activeEndpoints .stream ().collect (Collectors .toMap (x ->x ,
90
- x -> NodeResource .create ()));
94
+ final Map <DrillbitEndpoint , NodeResources > totalNodeResources =
95
+ onlineEndpointUUIDs . keySet () .stream ().collect (Collectors .toMap (x ->x ,
96
+ x -> NodeResources .create ()));
91
97
92
98
// list of the physical operators and their memory requirements per drillbit.
93
99
final Map <DrillbitEndpoint , List <Pair <PhysicalOperator , Long >>> operators =
94
- activeEndpoints .stream ().collect (Collectors .toMap (x -> x ,
100
+ onlineEndpointUUIDs . keySet () .stream ().collect (Collectors .toMap (x -> x ,
95
101
x -> new ArrayList <>()));
96
102
97
103
for (Wrapper wrapper : roots ) {
98
104
traverse (wrapper , CheckedConsumer .throwingConsumerWrapper ((Wrapper fragment ) -> {
99
105
MemoryCalculator calculator = new MemoryCalculator (planningSet , queryContext );
100
106
fragment .getNode ().getRoot ().accept (calculator , fragment );
101
- NodeResource .merge (totalNodeResources , fragment .getResourceMap ());
107
+ NodeResources .merge (totalNodeResources , fragment .getResourceMap ());
102
108
operators .entrySet ()
103
109
.stream ()
104
110
.forEach ((entry ) -> entry .getValue ()
105
111
.addAll (calculator .getBufferedOperators (entry .getKey ())));
106
112
}));
107
113
}
108
- //queryrm.selectQueue( pass the max node Resource) returns queue configuration.
109
- Map <DrillbitEndpoint , List <Pair <PhysicalOperator , Long >>> memoryAdjustedOperators = ensureOperatorMemoryWithinLimits (operators , totalNodeResources , 10 );
114
+
115
+ QueryQueueConfig queueConfig = null ;
116
+ try {
117
+ queueConfig = this .rm .selectQueue (max (totalNodeResources .values ()));
118
+ } catch (QueueSelectionException exception ) {
119
+ throw new ExecutionSetupException (exception .getMessage ());
120
+ }
121
+
122
+ Map <DrillbitEndpoint ,
123
+ List <Pair <PhysicalOperator , Long >>> memoryAdjustedOperators = ensureOperatorMemoryWithinLimits (operators , totalNodeResources ,
124
+ queueConfig .getMaxQueryMemoryInMBPerNode ());
110
125
memoryAdjustedOperators .entrySet ().stream ().forEach ((x ) -> {
111
126
Map <PhysicalOperator , Long > memoryPerOperator = x .getValue ().stream ()
112
127
.collect (Collectors .toMap (operatorLongPair -> operatorLongPair .getLeft (),
113
128
operatorLongPair -> operatorLongPair .getRight (),
114
129
(mem_1 , mem_2 ) -> (mem_1 + mem_2 )));
115
130
this .operators .put (x .getKey (), memoryPerOperator );
116
131
});
132
+
133
+ this .rm .setCost (convertToUUID (totalNodeResources , onlineEndpointUUIDs ));
134
+ }
135
+
136
+ private Map <String , NodeResources > convertToUUID (Map <DrillbitEndpoint , NodeResources > nodeResourcesMap ,
137
+ Map <DrillbitEndpoint , String > onlineEndpointUUIDs ) {
138
+ Map <String , NodeResources > nodeResourcesPerUUID = new HashMap <>();
139
+ for (Map .Entry <DrillbitEndpoint , NodeResources > nodeResource : nodeResourcesMap .entrySet ()) {
140
+ nodeResourcesPerUUID .put (onlineEndpointUUIDs .get (nodeResource .getKey ()), nodeResource .getValue ());
141
+ }
142
+ return nodeResourcesPerUUID ;
143
+ }
144
+
145
+ private NodeResources max (Collection <NodeResources > resources ) {
146
+ NodeResources maxResource = null ;
147
+ for (NodeResources resource : resources ) {
148
+ if (maxResource == null || maxResource .getMemoryInBytes () < resource .getMemoryInBytes ()) {
149
+ maxResource = resource ;
150
+ }
151
+ }
152
+ return maxResource ;
117
153
}
118
154
119
155
@@ -126,12 +162,12 @@ public void adjustMemory(PlanningSet planningSet, Set<Wrapper> roots,
126
162
*/
127
163
private Map <DrillbitEndpoint , List <Pair <PhysicalOperator , Long >>>
128
164
ensureOperatorMemoryWithinLimits (Map <DrillbitEndpoint , List <Pair <PhysicalOperator , Long >>> memoryPerOperator ,
129
- Map <DrillbitEndpoint , NodeResource > nodeResourceMap , int nodeLimit ) {
165
+ Map <DrillbitEndpoint , NodeResources > nodeResourceMap , long nodeLimit ) {
130
166
// Get the physical operators which are above the node memory limit.
131
167
Map <DrillbitEndpoint , List <Pair <PhysicalOperator , Long >>> onlyMemoryAboveLimitOperators = new HashMap <>();
132
168
memoryPerOperator .entrySet ().stream ().forEach ((entry ) -> {
133
169
onlyMemoryAboveLimitOperators .putIfAbsent (entry .getKey (), new ArrayList <>());
134
- if (nodeResourceMap .get (entry .getKey ()).getMemory () > nodeLimit ) {
170
+ if (nodeResourceMap .get (entry .getKey ()).getMemoryInBytes () > nodeLimit ) {
135
171
onlyMemoryAboveLimitOperators .get (entry .getKey ()).addAll (entry .getValue ());
136
172
}
137
173
});
@@ -148,6 +184,8 @@ public void adjustMemory(PlanningSet planningSet, Set<Wrapper> roots,
148
184
return Pair .of (operatorMemory .getKey (), (long ) Math .ceil (operatorMemory .getValue ()/totalMemory * nodeLimit ));
149
185
}).collect (Collectors .toList ());
150
186
memoryAdjustedDrillbits .put (entry .getKey (), adjustedMemory );
187
+ NodeResources nodeResources = nodeResourceMap .get (entry .getKey ());
188
+ nodeResources .setMemoryInBytes (adjustedMemory .stream ().mapToLong (Pair ::getValue ).sum ());
151
189
}
152
190
);
153
191
0 commit comments