Skip to content

Commit 2cac65a

Browse files
committed
Introduce ServiceVersion framework to support backward compatible and reduce BlcokIdLayout message
1 parent 2b70eb4 commit 2cac65a

File tree

17 files changed

+268
-53
lines changed

17 files changed

+268
-53
lines changed

client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -583,7 +583,8 @@ public void registerShuffle(
583583
dataDistributionType,
584584
maxConcurrencyPerPartitionToWrite,
585585
stageAttemptNumber,
586-
mergeContext);
586+
mergeContext,
587+
blockIdLayout);
587588
RssRegisterShuffleResponse response =
588589
getShuffleServerClient(shuffleServerInfo).registerShuffle(request);
589590

common/src/main/java/org/apache/uniffle/common/ShuffleServerInfo.java

+14-1
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ public class ShuffleServerInfo implements Serializable {
3535

3636
private int nettyPort = -1;
3737

38+
private int serviceVersion = 0;
39+
3840
@VisibleForTesting
3941
public ShuffleServerInfo(String host, int port) {
4042
this.id = host + "-" + port;
@@ -57,10 +59,16 @@ public ShuffleServerInfo(String id, String host, int port) {
5759
}
5860

5961
public ShuffleServerInfo(String id, String host, int grpcPort, int nettyPort) {
62+
this(id, host, grpcPort, nettyPort, 0);
63+
}
64+
65+
public ShuffleServerInfo(
66+
String id, String host, int grpcPort, int nettyPort, int serviceVersion) {
6067
this.id = id;
6168
this.host = host;
6269
this.grpcPort = grpcPort;
6370
this.nettyPort = nettyPort;
71+
this.serviceVersion = serviceVersion;
6472
}
6573

6674
public String getId() {
@@ -79,6 +87,10 @@ public int getNettyPort() {
7987
return nettyPort;
8088
}
8189

90+
public int getServiceVersion() {
91+
return serviceVersion;
92+
}
93+
8294
@Override
8395
public int hashCode() {
8496
// By default id = host + "-" + grpc port, if netty port is greater than 0,
@@ -121,7 +133,8 @@ private static ShuffleServerInfo convertFromShuffleServerId(
121133
shuffleServerId.getId(),
122134
shuffleServerId.getIp(),
123135
shuffleServerId.getPort(),
124-
shuffleServerId.getNettyPort());
136+
shuffleServerId.getNettyPort(),
137+
shuffleServerId.getServiceVersion());
125138
return shuffleServerInfo;
126139
}
127140

common/src/main/java/org/apache/uniffle/common/netty/protocol/Decoders.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,9 @@ public static ShuffleServerInfo decodeShuffleServerInfo(ByteBuf byteBuf) {
3636
String host = ByteBufUtils.readLengthAndString(byteBuf);
3737
int grpcPort = byteBuf.readInt();
3838
int nettyPort = byteBuf.readInt();
39-
return new ShuffleServerInfo(id, host, grpcPort, nettyPort);
39+
// this decodeShuffleServerInfo method is deprecated,
40+
// clients do not need to encode service version
41+
return new ShuffleServerInfo(id, host, grpcPort, nettyPort, 0);
4042
}
4143

4244
public static ShuffleBlockInfo decodeShuffleBlockInfo(ByteBuf byteBuf) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.uniffle.common.rpc;
19+
20+
import java.util.Arrays;
21+
import java.util.Map;
22+
import java.util.stream.Collectors;
23+
24+
public class ServiceVersion {
25+
public static final ServiceVersion NEWEST_VERSION =
26+
new ServiceVersion(Feature.getNewest().getVersion());
27+
static final Map<Integer, Feature> VALUE_MAP =
28+
Arrays.stream(Feature.values()).collect(Collectors.toMap(Feature::getVersion, s -> s));
29+
private final int version;
30+
31+
public ServiceVersion(int version) {
32+
this.version = version;
33+
}
34+
35+
public Feature getCurrentFeature() {
36+
return VALUE_MAP.get(version);
37+
}
38+
39+
public boolean supportFeature(Feature registerBlockIdLayout) {
40+
return version >= registerBlockIdLayout.getVersion();
41+
}
42+
43+
public int getVersion() {
44+
return version;
45+
}
46+
47+
public enum Feature {
48+
// Treat the old version as init version
49+
INIT_VERSION(0),
50+
// Register block id layout to server to avoid sending block id layout for each getShuffleResult
51+
// request
52+
REGISTER_BLOCK_ID_LAYOUT(1),
53+
;
54+
55+
private final int version;
56+
57+
Feature(int version) {
58+
this.version = version;
59+
}
60+
61+
public int getVersion() {
62+
return version;
63+
}
64+
65+
public static Feature getNewest() {
66+
Feature[] enumConstants = Feature.class.getEnumConstants();
67+
return enumConstants[enumConstants.length - 1];
68+
}
69+
}
70+
}

common/src/main/java/org/apache/uniffle/common/util/BlockIdLayout.java

+9
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.apache.uniffle.common.config.RssClientConf;
2323
import org.apache.uniffle.common.config.RssConf;
24+
import org.apache.uniffle.proto.RssProtos;
2425

2526
/**
2627
* This represents the actual bit layout of {@link BlockId}s.
@@ -194,6 +195,14 @@ public BlockId asBlockId(int sequenceNo, int partitionId, long taskAttemptId) {
194195
(int) taskAttemptId);
195196
}
196197

198+
public RssProtos.BlockIdLayout toProto() {
199+
return RssProtos.BlockIdLayout.newBuilder()
200+
.setSequenceNoBits(sequenceNoBits)
201+
.setPartitionIdBits(partitionIdBits)
202+
.setTaskAttemptIdBits(taskAttemptIdBits)
203+
.build();
204+
}
205+
197206
public static BlockIdLayout from(RssConf rssConf) {
198207
int sequenceBits = rssConf.get(RssClientConf.BLOCKID_SEQUENCE_NO_BITS);
199208
int partitionBits = rssConf.get(RssClientConf.BLOCKID_PARTITION_ID_BITS);

coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -538,7 +538,8 @@ private ServerNode toServerNode(ShuffleServerHeartBeatRequest request) {
538538
request.getStartTimeMs(),
539539
request.getVersion(),
540540
request.getGitCommitId(),
541-
request.getApplicationInfoList());
541+
request.getApplicationInfoList(),
542+
request.getServiceVersion());
542543
}
543544

544545
/**

coordinator/src/main/java/org/apache/uniffle/coordinator/ServerNode.java

+10-3
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.Set;
2424
import java.util.concurrent.ConcurrentHashMap;
2525

26+
import com.google.common.annotations.VisibleForTesting;
2627
import com.google.common.collect.Maps;
2728
import com.google.common.collect.Sets;
2829

@@ -33,6 +34,7 @@
3334

3435
public class ServerNode implements Comparable<ServerNode> {
3536

37+
private final int serviceVersion;
3638
private String id;
3739
private String ip;
3840
private int grpcPort;
@@ -56,7 +58,7 @@ public ServerNode(String id) {
5658
this(id, "", 0, 0, 0, 0, 0, Sets.newHashSet(), ServerStatus.EXCLUDED);
5759
}
5860

59-
// Only for test
61+
@VisibleForTesting
6062
public ServerNode(
6163
String id,
6264
String ip,
@@ -129,6 +131,7 @@ public ServerNode(
129131
-1);
130132
}
131133

134+
@VisibleForTesting
132135
public ServerNode(
133136
String id,
134137
String ip,
@@ -187,7 +190,8 @@ public ServerNode(
187190
startTime,
188191
"",
189192
"",
190-
Collections.EMPTY_LIST);
193+
Collections.EMPTY_LIST,
194+
0);
191195
}
192196

193197
public ServerNode(
@@ -206,7 +210,8 @@ public ServerNode(
206210
long startTime,
207211
String version,
208212
String gitCommitId,
209-
List<RssProtos.ApplicationInfo> appInfos) {
213+
List<RssProtos.ApplicationInfo> appInfos,
214+
int serviceVersion) {
210215
this.id = id;
211216
this.ip = ip;
212217
this.grpcPort = grpcPort;
@@ -230,6 +235,7 @@ public ServerNode(
230235
this.gitCommitId = gitCommitId;
231236
this.appIdToInfos = new ConcurrentHashMap<>();
232237
appInfos.forEach(appInfo -> appIdToInfos.put(appInfo.getAppId(), appInfo));
238+
this.serviceVersion = serviceVersion;
233239
}
234240

235241
public ShuffleServerId convertToGrpcProto() {
@@ -239,6 +245,7 @@ public ShuffleServerId convertToGrpcProto() {
239245
.setPort(grpcPort)
240246
.setNettyPort(nettyPort)
241247
.setJettyPort(jettyPort)
248+
.setServiceVersion(serviceVersion)
242249
.build();
243250
}
244251

internal-client/src/main/java/org/apache/uniffle/client/factory/ShuffleServerClientFactory.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -47,13 +47,17 @@ private ShuffleServerClient createShuffleServerClient(
4747
String clientType, ShuffleServerInfo shuffleServerInfo, RssConf rssConf) {
4848
if (clientType.equalsIgnoreCase(ClientType.GRPC.name())) {
4949
return new ShuffleServerGrpcClient(
50-
rssConf, shuffleServerInfo.getHost(), shuffleServerInfo.getGrpcPort());
50+
rssConf,
51+
shuffleServerInfo.getHost(),
52+
shuffleServerInfo.getGrpcPort(),
53+
shuffleServerInfo.getServiceVersion());
5154
} else if (clientType.equalsIgnoreCase(ClientType.GRPC_NETTY.name())) {
5255
return new ShuffleServerGrpcNettyClient(
5356
rssConf,
5457
shuffleServerInfo.getHost(),
5558
shuffleServerInfo.getGrpcPort(),
56-
shuffleServerInfo.getNettyPort());
59+
shuffleServerInfo.getNettyPort(),
60+
shuffleServerInfo.getServiceVersion());
5761
} else {
5862
throw new UnsupportedOperationException("Unsupported client type " + clientType);
5963
}

internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java

+13-2
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import org.apache.uniffle.common.ServerStatus;
5353
import org.apache.uniffle.common.ShuffleServerInfo;
5454
import org.apache.uniffle.common.exception.RssException;
55+
import org.apache.uniffle.common.rpc.ServiceVersion;
5556
import org.apache.uniffle.common.rpc.StatusCode;
5657
import org.apache.uniffle.common.storage.StorageInfo;
5758
import org.apache.uniffle.common.storage.StorageInfoUtils;
@@ -151,6 +152,7 @@ public ShuffleServerHeartBeatResponse doSendHeartBeat(
151152
.setVersion(Constants.VERSION)
152153
.setGitCommitId(Constants.REVISION_SHORT)
153154
.addAllApplicationInfo(appInfos)
155+
.setServiceVersion(ServiceVersion.NEWEST_VERSION.getVersion())
154156
.build();
155157

156158
RssProtos.StatusCode status;
@@ -424,7 +426,11 @@ public Map<Integer, List<ShuffleServerInfo>> getPartitionToServers(
424426
.map(
425427
ss ->
426428
new ShuffleServerInfo(
427-
ss.getId(), ss.getIp(), ss.getPort(), ss.getNettyPort()))
429+
ss.getId(),
430+
ss.getIp(),
431+
ss.getPort(),
432+
ss.getNettyPort(),
433+
ss.getServiceVersion()))
428434
.collect(Collectors.toList());
429435
for (int i = startPartition; i <= endPartition; i++) {
430436
partitionToServers.put(i, shuffleServerInfos);
@@ -449,7 +455,12 @@ public Map<ShuffleServerInfo, List<PartitionRange>> getServerToPartitionRanges(
449455
new PartitionRange(assign.getStartPartition(), assign.getEndPartition());
450456
for (ShuffleServerId ssi : shuffleServerIds) {
451457
ShuffleServerInfo shuffleServerInfo =
452-
new ShuffleServerInfo(ssi.getId(), ssi.getIp(), ssi.getPort(), ssi.getNettyPort());
458+
new ShuffleServerInfo(
459+
ssi.getId(),
460+
ssi.getIp(),
461+
ssi.getPort(),
462+
ssi.getNettyPort(),
463+
ssi.getServiceVersion());
453464
if (!serverToPartitionRanges.containsKey(shuffleServerInfo)) {
454465
serverToPartitionRanges.put(shuffleServerInfo, Lists.newArrayList());
455466
}

internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/GrpcClient.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -25,31 +25,35 @@
2525
import org.slf4j.Logger;
2626
import org.slf4j.LoggerFactory;
2727

28+
import org.apache.uniffle.common.rpc.ServiceVersion;
2829
import org.apache.uniffle.common.util.GrpcNettyUtils;
2930

3031
public abstract class GrpcClient {
3132

3233
private static final Logger logger = LoggerFactory.getLogger(GrpcClient.class);
3334
protected String host;
3435
protected int port;
36+
protected ServiceVersion serviceVersion;
3537
protected boolean usePlaintext;
3638
protected int maxRetryAttempts;
3739
protected ManagedChannel channel;
3840

3941
protected GrpcClient(String host, int port, int maxRetryAttempts, boolean usePlaintext) {
40-
this(host, port, maxRetryAttempts, usePlaintext, 0, 0, 0);
42+
this(host, port, 0, maxRetryAttempts, usePlaintext, 0, 0, 0);
4143
}
4244

4345
protected GrpcClient(
4446
String host,
4547
int port,
48+
int serviceVersion,
4649
int maxRetryAttempts,
4750
boolean usePlaintext,
4851
int pageSize,
4952
int maxOrder,
5053
int smallCacheSize) {
5154
this.host = host;
5255
this.port = port;
56+
this.serviceVersion = new ServiceVersion(serviceVersion);
5357
this.maxRetryAttempts = maxRetryAttempts;
5458
this.usePlaintext = usePlaintext;
5559

@@ -75,6 +79,10 @@ protected GrpcClient(ManagedChannel channel) {
7579
this.channel = channel;
7680
}
7781

82+
public ServiceVersion getServiceVersion() {
83+
return serviceVersion;
84+
}
85+
7886
public void close() {
7987
try {
8088
channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);

0 commit comments

Comments
 (0)