17
17
*/
18
18
package org .apache .drill .exec .coord .zk ;
19
19
20
- import static org .apache .drill .shaded .guava .com .google .common .collect .Collections2 .transform ;
21
- import java .io .IOException ;
22
- import java .util .Collection ;
23
- import java .util .Collections ;
24
- import java .util .ArrayList ;
25
- import java .util .Set ;
26
- import java .util .HashSet ;
27
- import java .util .concurrent .ConcurrentHashMap ;
28
- import java .util .concurrent .CountDownLatch ;
29
- import java .util .concurrent .TimeUnit ;
30
- import java .util .regex .Matcher ;
31
- import java .util .regex .Pattern ;
32
-
33
- import org .apache .curator .framework .imps .DefaultACLProvider ;
34
- import org .apache .drill .shaded .guava .com .google .common .base .Throwables ;
35
- import org .apache .commons .collections .keyvalue .MultiKey ;
36
20
import org .apache .curator .RetryPolicy ;
37
21
import org .apache .curator .framework .CuratorFramework ;
38
22
import org .apache .curator .framework .CuratorFrameworkFactory ;
39
23
import org .apache .curator .framework .api .ACLProvider ;
24
+ import org .apache .curator .framework .imps .DefaultACLProvider ;
40
25
import org .apache .curator .framework .state .ConnectionState ;
41
26
import org .apache .curator .framework .state .ConnectionStateListener ;
42
27
import org .apache .curator .retry .RetryNTimes ;
57
42
import org .apache .drill .exec .coord .store .TransientStoreFactory ;
58
43
import org .apache .drill .exec .proto .CoordinationProtos .DrillbitEndpoint ;
59
44
import org .apache .drill .exec .proto .CoordinationProtos .DrillbitEndpoint .State ;
60
- import org .apache .drill .shaded .guava .com .google .common .base .Function ;
45
+ import org .apache .drill .shaded .guava .com .google .common .base .Throwables ;
46
+
47
+ import java .io .IOException ;
48
+ import java .util .Collection ;
49
+ import java .util .Collections ;
50
+ import java .util .HashMap ;
51
+ import java .util .HashSet ;
52
+ import java .util .Map ;
53
+ import java .util .Set ;
54
+ import java .util .concurrent .ConcurrentHashMap ;
55
+ import java .util .concurrent .CountDownLatch ;
56
+ import java .util .concurrent .TimeUnit ;
57
+ import java .util .regex .Matcher ;
58
+ import java .util .regex .Pattern ;
59
+ import java .util .stream .Collectors ;
61
60
62
61
/**
63
62
* Manages cluster coordination utilizing zookeeper. *
@@ -74,8 +73,8 @@ public class ZKClusterCoordinator extends ClusterCoordinator {
74
73
private ServiceCache <DrillbitEndpoint > serviceCache ;
75
74
private DrillbitEndpoint endpoint ;
76
75
77
- // endpointsMap maps Multikey( comprises of endoint address and port) to Drillbit endpoints
78
- private ConcurrentHashMap <MultiKey , DrillbitEndpoint > endpointsMap = new ConcurrentHashMap <MultiKey , DrillbitEndpoint >();
76
+ // endpointsMap maps String UUID to Drillbit endpoints
77
+ private ConcurrentHashMap <String , DrillbitEndpoint > endpointsMap = new ConcurrentHashMap <>();
79
78
private static final Pattern ZK_COMPLEX_STRING = Pattern .compile ("(^.*?)/(.*)/([^/]*)$" );
80
79
81
80
public ZKClusterCoordinator (DrillConfig config , String connect ) {
@@ -237,7 +236,12 @@ public RegistrationHandle update(RegistrationHandle handle, State state) {
237
236
238
237
@ Override
239
238
public Collection <DrillbitEndpoint > getAvailableEndpoints () {
240
- return this .endpoints ;
239
+ return getAvailableEndpointsUUID ().values ();
240
+ }
241
+
242
+ @ Override
243
+ public Map <String , DrillbitEndpoint > getAvailableEndpointsUUID () {
244
+ return this .endpointsMap ;
241
245
}
242
246
243
247
/*
@@ -249,14 +253,19 @@ public Collection<DrillbitEndpoint> getAvailableEndpoints() {
249
253
*/
250
254
@ Override
251
255
public Collection <DrillbitEndpoint > getOnlineEndPoints () {
252
- Collection <DrillbitEndpoint > runningEndPoints = new ArrayList <>();
253
- for (DrillbitEndpoint endpoint : endpoints ){
254
- if (isDrillbitInState (endpoint , State .ONLINE )) {
255
- runningEndPoints .add (endpoint );
256
+ return getOnlineEndpointsUUID ().keySet ();
257
+ }
258
+
259
+ @ Override
260
+ public Map <DrillbitEndpoint , String > getOnlineEndpointsUUID () {
261
+ Map <DrillbitEndpoint , String > onlineEndpointsUUID = new HashMap <>();
262
+ for (Map .Entry <String , DrillbitEndpoint > endpointEntry : endpointsMap .entrySet ()) {
263
+ if (isDrillbitInState (endpointEntry .getValue (), State .ONLINE )) {
264
+ onlineEndpointsUUID .put (endpointEntry .getValue (), endpointEntry .getKey ());
256
265
}
257
266
}
258
- logger .debug ("Online endpoints in ZK are" + runningEndPoints .toString ());
259
- return runningEndPoints ;
267
+ logger .debug ("Online endpoints in ZK are" + onlineEndpointsUUID . keySet () .toString ());
268
+ return onlineEndpointsUUID ;
260
269
}
261
270
262
271
@ Override
@@ -273,14 +282,11 @@ public <V> TransientStore<V> getOrCreateTransientStore(final TransientStoreConfi
273
282
private synchronized void updateEndpoints () {
274
283
try {
275
284
// All active bits in the Zookeeper
276
- Collection <DrillbitEndpoint > newDrillbitSet =
277
- transform (discovery .queryForInstances (serviceName ),
278
- new Function <ServiceInstance <DrillbitEndpoint >, DrillbitEndpoint >() {
279
- @ Override
280
- public DrillbitEndpoint apply (ServiceInstance <DrillbitEndpoint > input ) {
281
- return input .getPayload ();
282
- }
283
- });
285
+ final Map <String , DrillbitEndpoint > activeEndpointsUUID = discovery .queryForInstances (serviceName ).stream ()
286
+ .collect (Collectors .toMap (ServiceInstance ::getId , ServiceInstance ::getPayload ));
287
+
288
+ final Map <DrillbitEndpoint , String > UUIDtoEndpoints = activeEndpointsUUID .entrySet ().stream ()
289
+ .collect (Collectors .toMap (Map .Entry ::getValue , Map .Entry ::getKey ));
284
290
285
291
// set of newly dead bits : original bits - new set of active bits.
286
292
Set <DrillbitEndpoint > unregisteredBits = new HashSet <>();
@@ -290,29 +296,32 @@ public DrillbitEndpoint apply(ServiceInstance<DrillbitEndpoint> input) {
290
296
291
297
// Updates the endpoints map if there is a change in state of the endpoint or with the addition
292
298
// of new drillbit endpoints. Registered endpoints is set to newly live drillbit endpoints.
293
- for ( DrillbitEndpoint endpoint : newDrillbitSet ) {
294
- String endpointAddress = endpoint .getAddress ();
295
- int endpointPort = endpoint .getUserPort ();
296
- if (! endpointsMap .containsKey (new MultiKey (endpointAddress , endpointPort ))) {
297
- registeredBits .add (endpoint );
298
- }
299
- endpointsMap .put (new MultiKey (endpointAddress , endpointPort ),endpoint );
299
+ for (Map .Entry <String , DrillbitEndpoint > endpointToUUID : activeEndpointsUUID .entrySet ()) {
300
+ endpointsMap .put (endpointToUUID .getKey (), endpointToUUID .getValue ());
300
301
}
302
+
301
303
// Remove all the endpoints that are newly dead
302
- for ( MultiKey key : endpointsMap .keySet ()) {
303
- if (!newDrillbitSet .contains (endpointsMap .get (key ))) {
304
- unregisteredBits .add (endpointsMap .get (key ));
305
- endpointsMap .remove (key );
304
+ for ( String bitUUID : endpointsMap .keySet ()) {
305
+ if (!activeEndpointsUUID .containsKey (bitUUID )) {
306
+ final DrillbitEndpoint unregisteredBit = endpointsMap .get (bitUUID );
307
+ unregisteredBits .add (unregisteredBit );
308
+
309
+ if (UUIDtoEndpoints .containsKey (unregisteredBit )) {
310
+ logger .info ("Drillbit registered again with different UUID. [Details: Address: {}, UserPort: {}," +
311
+ " PreviousUUID: {}, CurrentUUID: {}" , unregisteredBit .getAddress (), unregisteredBit .getUserPort (),
312
+ bitUUID , UUIDtoEndpoints .get (unregisteredBit ));
313
+ }
314
+ endpointsMap .remove (bitUUID );
306
315
}
307
316
}
308
317
endpoints = endpointsMap .values ();
309
318
if (logger .isDebugEnabled ()) {
310
319
StringBuilder builder = new StringBuilder ();
311
320
builder .append ("Active drillbit set changed. Now includes " );
312
- builder .append (newDrillbitSet .size ());
321
+ builder .append (activeEndpointsUUID .size ());
313
322
builder .append (" total bits. New active drillbits:\n " );
314
323
builder .append ("Address | User Port | Control Port | Data Port | Version | State\n " );
315
- for (DrillbitEndpoint bit : newDrillbitSet ) {
324
+ for (DrillbitEndpoint bit : activeEndpointsUUID . values () ) {
316
325
builder .append (bit .getAddress ()).append (" | " );
317
326
builder .append (bit .getUserPort ()).append (" | " );
318
327
builder .append (bit .getControlPort ()).append (" | " );
0 commit comments