Skip to content

Commit d8ca817

Browse files
committed
Add support for multiple cluster-ids in one setup
1 parent b3967ae commit d8ca817

12 files changed

+341
-136
lines changed

uniqueid-etcd/src/main/java/org/lable/oss/uniqueid/etcd/ClusterID.java

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,11 @@
2222

2323
import java.io.IOException;
2424
import java.nio.charset.StandardCharsets;
25+
import java.util.Collections;
26+
import java.util.List;
2527
import java.util.concurrent.ExecutionException;
28+
import java.util.stream.Collectors;
29+
import java.util.stream.Stream;
2630

2731
public class ClusterID {
2832
final static ByteSequence CLUSTER_ID_KEY = ByteSequence.from("cluster-id", StandardCharsets.UTF_8);
@@ -35,34 +39,46 @@ public class ClusterID {
3539
* @return The cluster ID, if configured in the cluster.
3640
* @throws IOException Thrown when retrieving the ID fails.
3741
*/
38-
public static int get(Client etcd) throws IOException {
42+
public static List<Integer> get(Client etcd) throws IOException {
3943
GetResponse get;
4044
try {
4145
get = etcd.getKVClient().get(CLUSTER_ID_KEY).get();
4246
} catch (InterruptedException | ExecutionException e) {
4347
throw new IOException(e);
4448
}
4549

46-
Integer id = null;
50+
List<Integer> ids = null;
4751

4852
for (KeyValue kv : get.getKvs()) {
4953
if (kv.getKey().equals(CLUSTER_ID_KEY)) {
54+
// There should be only one key returned.
5055
String value = kv.getValue().toString(StandardCharsets.UTF_8);
51-
id = Integer.parseInt(value);
56+
try {
57+
ids = parseIntegers(value);
58+
} catch (NumberFormatException e) {
59+
throw new IOException("Failed to parse cluster-id value `" + value + "`.", e);
60+
}
5261
break;
5362
}
5463
}
5564

56-
if (id == null) {
65+
if (ids == null) {
5766
ByteSequence defaultValue = ByteSequence.from(String.valueOf(DEFAULT_CLUSTER_ID).getBytes());
5867
try {
5968
etcd.getKVClient().put(CLUSTER_ID_KEY, defaultValue).get();
60-
return DEFAULT_CLUSTER_ID;
69+
return Collections.singletonList(DEFAULT_CLUSTER_ID);
6170
} catch (InterruptedException | ExecutionException e) {
6271
throw new IOException(e);
6372
}
6473
} else {
65-
return id;
74+
return ids;
6675
}
6776
}
77+
78+
static List<Integer> parseIntegers(String serialized) {
79+
return Stream.of(serialized.split(","))
80+
.map(String::trim)
81+
.map(Integer::parseInt)
82+
.collect(Collectors.toList());
83+
}
6884
}

uniqueid-etcd/src/main/java/org/lable/oss/uniqueid/etcd/ExpiringResourceClaim.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import java.io.IOException;
2323
import java.time.Duration;
24+
import java.util.List;
2425
import java.util.Timer;
2526
import java.util.TimerTask;
2627

@@ -35,10 +36,11 @@ public class ExpiringResourceClaim extends ResourceClaim {
3536
public final static Duration DEFAULT_ACQUISITION_TIMEOUT = Duration.ofMinutes(10);
3637

3738
ExpiringResourceClaim(Client etcd,
38-
int poolSize,
39+
int maxGeneratorCount,
40+
List<Integer> clusterIds,
3941
Duration claimHold,
4042
Duration acquisitionTimeout) throws IOException {
41-
super(etcd, poolSize, acquisitionTimeout);
43+
super(etcd, maxGeneratorCount, clusterIds, acquisitionTimeout);
4244
new Timer().schedule(new TimerTask() {
4345
@Override
4446
public void run() {
@@ -50,28 +52,30 @@ public void run() {
5052
/**
5153
* Claim a resource.
5254
*
53-
* @param etcd Etcd connection to use.
54-
* @param poolSize Size of the resource pool.
55+
* @param etcd Etcd connection to use.
56+
* @param maxGeneratorCount Maximum number of generators possible.
5557
* @return A resource claim.
5658
*/
57-
public static ResourceClaim claimExpiring(Client etcd, int poolSize)
59+
public static ResourceClaim claimExpiring(Client etcd, int maxGeneratorCount, List<Integer> clusterIds)
5860
throws IOException {
59-
return claimExpiring(etcd, poolSize, DEFAULT_CLAIM_HOLD, DEFAULT_ACQUISITION_TIMEOUT);
61+
return claimExpiring(etcd, maxGeneratorCount, clusterIds, DEFAULT_CLAIM_HOLD, DEFAULT_ACQUISITION_TIMEOUT);
6062
}
6163

6264
/**
6365
* Claim a resource.
6466
*
6567
* @param etcd Etcd connection to use.
66-
* @param poolSize Size of the resource pool.
68+
* @param maxGeneratorCount Maximum number of generators possible.
69+
* @param clusterIds Cluster Ids available to use.
6770
* @param claimHold How long the claim should be held. May be {@code null} for the default value of
6871
* {@link #DEFAULT_CLAIM_HOLD}.
6972
* @param acquisitionTimeout How long to keep trying to acquire a claim. May be {@code null} to keep trying
7073
* indefinitely.
7174
* @return A resource claim.
7275
*/
7376
public static ResourceClaim claimExpiring(Client etcd,
74-
int poolSize,
77+
int maxGeneratorCount,
78+
List<Integer> clusterIds,
7579
Duration claimHold,
7680
Duration acquisitionTimeout)
7781
throws IOException {
@@ -81,6 +85,6 @@ public static ResourceClaim claimExpiring(Client etcd,
8185
logger.debug("Preparing expiring resource-claim; will release it in {}ms.", claimHold.toMillis());
8286
}
8387

84-
return new ExpiringResourceClaim(etcd, poolSize, claimHold, acquisitionTimeout);
88+
return new ExpiringResourceClaim(etcd, maxGeneratorCount, clusterIds, claimHold, acquisitionTimeout);
8589
}
8690
}

0 commit comments

Comments
 (0)