task = cluster.join();
+ try {
+ task.get();
+ }
+ catch (Exception e) {
+ throw CheckedExceptions.wrapAsRuntimeException(e);
+ }
+ Logger.info("Concourse Server has joined a distributed cluster");
+ }
+
+ // Load the Engine for the default environment
+ getEngine();
for (Engine engine : engines.values()) {
engine.start();
}
+
httpServer.start();
pluginManager.start();
Thread mgmtThread = new Thread(() -> {
@@ -6089,6 +6121,10 @@ public void start() throws TTransportException {
@PluginRestricted
public void stop() {
if(server.isServing()) {
+ if(cluster != null) {
+ cluster.leave();
+ Logger.info("Concourse Server has left a distributed cluster");
+ }
mgmtServer.stop();
server.stop();
pluginManager.stop();
@@ -6500,12 +6536,7 @@ protected String getDbStore() {
*/
@Internal
protected Engine getEngine(String env) {
- Engine engine = engines.get(env);
- if(engine == null) {
- env = Environments.sanitize(env);
- return getEngineUnsafe(env);
- }
- return engine;
+ return getEngineUnsafe(Environments.sanitize(env));
}
@Override
@@ -6557,6 +6588,17 @@ private Engine getEngineUnsafe(String env) {
String buffer = bufferStore + File.separator + env;
String db = dbStore + File.separator + env;
Engine engine = new Engine(buffer, db, env);
+ if(cluster != null) {
+ engine = Ensemble.replicate(engine).across(cluster);
+
+ // Notify other nodes to start the Engine for #env. Here, we use
+ // Gossip instead of a Notice so that clients connecting are not
+ // blocked by as few as a single node being unavailable.
+ // Additionally, each node periodically shares it list of active
+ // Engines so nodes that don't receive this initial gossip will
+ // eventually catch up.
+ cluster.spread(new StartEngineGossip(env));
+ }
engine.start();
numEnginesInitialized.incrementAndGet();
return engine;
@@ -6628,7 +6670,7 @@ private void init(int port, String bufferStore, String dbStore)
this.server = new TThreadPoolServer(args);
this.bufferStore = bufferStore;
this.dbStore = dbStore;
- this.engines = Maps.newConcurrentMap();
+ this.engines = new ConcurrentHashMap<>();
this.users = UserService.create(ACCESS_CREDENTIALS_FILE);
this.inspector = new Inspector() {
@@ -6660,7 +6702,6 @@ public boolean tokenUserHasPermission(AccessToken token,
this.httpServer = GlobalState.HTTP_PORT > 0
? HttpServer.create(this, GlobalState.HTTP_PORT)
: HttpServer.disabled();
- getEngine(); // load the default engine
this.pluginManager = new PluginManager(this,
GlobalState.CONCOURSE_HOME + File.separator + "plugins");
@@ -6671,6 +6712,44 @@ public boolean tokenUserHasPermission(AccessToken token,
.processor(new ConcourseManagementService.Processor<>(this))
.minWorkerThreads(1).maxWorkerThreads(1);
this.mgmtServer = new TThreadPoolServer(mgmtArgs);
+
+ // Setup the distributed cluster
+ if(CLUSTER.isDefined()) {
+ EnsembleSetup.interceptLogging();
+ EnsembleSetup.registerCustomSerialization();
+
+ // Claim ports used by this node to disambiguate among any other
+ // nodes that may be running on the same physical/logical machine.
+ LocalProcess.instance().clear();
+ LocalProcess.instance().claim(port);
+ // TODO: claim shutdown port?
+
+ Cluster.Builder builder = Cluster.builder();
+ builder.replicationFactor(CLUSTER.replicationFactor());
+ // TODO: add a Node for this instance just in case it isn't defined
+ // in the config? How do I detect if it is defined in the config?
+ for (String address : CLUSTER.nodes()) {
+ Node node = new Node(HostAndPort.fromString(address));
+ builder.add(node);
+ }
+
+ // Configure this Node to receive Gossip from other nodes and handle
+ // it accordingly
+ builder.handle(StartEngineGossip.class, gossip -> {
+ String environment = gossip.environment();
+ getEngineUnsafe(environment);
+ });
+
+ // Configure both the distributed framework AND internal operations
+ // to generate timestamps that are "synced" across each node in the
+ // cluster.
+ builder.clock(TimeSource.distributed());
+
+ this.cluster = builder.build();
+ }
+ else {
+ this.cluster = null;
+ }
}
/**
diff --git a/concourse-server/src/main/java/com/cinchapi/concourse/server/EnsembleSetup.java b/concourse-server/src/main/java/com/cinchapi/concourse/server/EnsembleSetup.java
new file mode 100644
index 000000000..6f8a3bed6
--- /dev/null
+++ b/concourse-server/src/main/java/com/cinchapi/concourse/server/EnsembleSetup.java
@@ -0,0 +1,103 @@
+/*
+ * Copyright (c) 2013-2025 Cinchapi Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cinchapi.concourse.server;
+
+import java.nio.ByteBuffer;
+
+import com.cinchapi.common.reflect.Reflection;
+import com.cinchapi.concourse.server.storage.temp.Write;
+import com.cinchapi.concourse.thrift.Operator;
+import com.cinchapi.concourse.thrift.TObject;
+import com.cinchapi.concourse.thrift.Type;
+import com.cinchapi.concourse.util.Logger;
+import com.cinchapi.ensemble.LoggingIntegration;
+import com.cinchapi.ensemble.io.Serialization;
+
+/**
+ * Consolidated factories for configuring the Ensemble distributed system
+ * framework.
+ *
+ * @author Jeff Nelson
+ */
+public final class EnsembleSetup {
+
+ /**
+ * Intercept logging from the Ensemble framework and route it to the native
+ * {@link Logger}.
+ */
+ public static void interceptLogging() {
+ com.cinchapi.ensemble.util.Logger
+ .setLoggingIntegration(new LoggingIntegration() {
+
+ @Override
+ public void debug(String message, Object... params) {
+ Logger.debug(message, params);
+ }
+
+ @Override
+ public void error(String message, Object... params) {
+ Logger.error(message, params);
+ }
+
+ @Override
+ public void info(String message, Object... params) {
+ Logger.info(message, params);
+ }
+
+ @Override
+ public void warn(String message, Object... params) {
+ Logger.warn(message, params);
+ }
+
+ });
+ }
+
+ /**
+ * Register all custom serialization required for Ensemble.
+ */
+ public static void registerCustomSerialization() {
+ Serialization.customize(TObject.Aliases.class, object -> {
+ Serialization stream = new Serialization();
+ TObject[] values = object.values();
+ stream.writeByte((byte) object.operator().ordinal());
+ stream.writeInt(values.length);
+ for (TObject value : values) {
+ byte[] data = value.getData();
+ stream.writeByte((byte) value.getType().ordinal());
+ stream.writeInt(data.length);
+ stream.writeByteArray(data);
+ }
+ return stream.bytes();
+ }, bytes -> {
+ Serialization stream = new Serialization(bytes);
+ Operator operator = Operator.values()[stream.readByte()];
+ int length = stream.readInt();
+ TObject[] values = new TObject[length];
+ for (int i = 0; i < length; ++i) {
+ Type type = Type.values()[stream.readByte()];
+ ByteBuffer data = stream.readByteBuffer(stream.readInt());
+ values[i] = new TObject(data, type);
+ }
+ return Reflection.newInstance(TObject.Aliases.class, operator,
+ values); /* (authorized) */
+ });
+ Serialization.customize(Write.class, Write::getBytes,
+ Write::fromByteBuffer);
+ }
+
+ private EnsembleSetup() {/* no-init */}
+
+}
diff --git a/concourse-server/src/main/java/com/cinchapi/concourse/server/GlobalState.java b/concourse-server/src/main/java/com/cinchapi/concourse/server/GlobalState.java
index e158ceba1..fe27d3272 100644
--- a/concourse-server/src/main/java/com/cinchapi/concourse/server/GlobalState.java
+++ b/concourse-server/src/main/java/com/cinchapi/concourse/server/GlobalState.java
@@ -35,6 +35,7 @@
import com.cinchapi.concourse.Constants;
import com.cinchapi.concourse.annotate.Experimental;
import com.cinchapi.concourse.annotate.NonPreference;
+import com.cinchapi.concourse.config.ConcourseClusterSpecification;
import com.cinchapi.concourse.config.ConcourseServerConfiguration;
import com.cinchapi.concourse.server.io.FileSystem;
import com.cinchapi.concourse.server.plugin.data.WriteEvent;
@@ -391,6 +392,13 @@ public final class GlobalState extends Constants {
@Experimental
public static boolean ENABLE_VERIFY_BY_LOOKUP = false;
+ /**
+ * Contains the specification for the distributed cluster of which Concourse
+ * Server is a member.
+ */
+ @Experimental
+ public static ConcourseClusterSpecification CLUSTER = ConcourseClusterSpecification.UNDEFINED;
+
/**
* Use a more memory-efficient representation for storage metadata.
*
@@ -509,6 +517,8 @@ public final class GlobalState extends Constants {
ENABLE_VERIFY_BY_LOOKUP = config.getOrDefault("enable_verify_by_lookup",
Interpreters.booleanOrNull(), ENABLE_VERIFY_BY_LOOKUP);
+ CLUSTER = ConcourseClusterSpecification.from(config);
+
INIT_ROOT_PASSWORD = config.getOrDefault("init.root.password",
config.getOrDefault("init_root_password", INIT_ROOT_PASSWORD));
diff --git a/concourse-server/src/main/java/com/cinchapi/concourse/server/cli/config/InspectConfigCli.java b/concourse-server/src/main/java/com/cinchapi/concourse/server/cli/config/InspectConfigCli.java
new file mode 100644
index 000000000..4e92f9cd6
--- /dev/null
+++ b/concourse-server/src/main/java/com/cinchapi/concourse/server/cli/config/InspectConfigCli.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright (c) 2013-2025 Cinchapi Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cinchapi.concourse.server.cli.config;
+
+/**
+ *
+ *
+ * @author jeff
+ */
+public class InspectConfigCli {
+
+}
diff --git a/concourse-server/src/main/java/com/cinchapi/concourse/server/cli/config/OverrideConfigCli.java b/concourse-server/src/main/java/com/cinchapi/concourse/server/cli/config/OverrideConfigCli.java
new file mode 100644
index 000000000..a2c1f2a7b
--- /dev/null
+++ b/concourse-server/src/main/java/com/cinchapi/concourse/server/cli/config/OverrideConfigCli.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright (c) 2013-2025 Cinchapi Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cinchapi.concourse.server.cli.config;
+
+import jline.console.ConsoleReader;
+
+import com.cinchapi.configctl.lib.ConfigCli;
+import com.cinchapi.lib.cli.Options;
+import com.cinchapi.lib.config.Configuration;
+
+/**
+ *
+ *
+ * @author Jeff Nelson
+ */
+public class OverrideConfigCli extends ConfigCli {
+
+ @Override
+ protected void setup(Options options, ConsoleReader console) {
+ // TODO: do login for Management stuffs
+ }
+
+ /**
+ * Construct a new instance.
+ *
+ * @param args
+ */
+ public OverrideConfigCli(String[] args) {
+ super(args);
+ }
+
+ @Override
+ protected void doTask(Configuration config) {
+ // TODO: take config and create
+ System.out.println("Hey There!");
+
+ }
+
+ @Override
+ protected ConfigCliOptions getOptions() {
+ return new ValueConfigCliOptions();
+ }
+
+}
diff --git a/concourse-server/src/main/java/com/cinchapi/concourse/server/cli/config/UpdateConfigCli.java b/concourse-server/src/main/java/com/cinchapi/concourse/server/cli/config/UpdateConfigCli.java
new file mode 100644
index 000000000..827cbdfe9
--- /dev/null
+++ b/concourse-server/src/main/java/com/cinchapi/concourse/server/cli/config/UpdateConfigCli.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright (c) 2013-2025 Cinchapi Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cinchapi.concourse.server.cli.config;
+
+/**
+ *
+ *
+ * @author jeff
+ */
+public class UpdateConfigCli {
+
+}
diff --git a/concourse-server/src/main/java/com/cinchapi/concourse/server/gossip/StartEngineGossip.java b/concourse-server/src/main/java/com/cinchapi/concourse/server/gossip/StartEngineGossip.java
new file mode 100644
index 000000000..bf9a24673
--- /dev/null
+++ b/concourse-server/src/main/java/com/cinchapi/concourse/server/gossip/StartEngineGossip.java
@@ -0,0 +1,96 @@
+/*
+ * Copyright (c) 2013-2025 Cinchapi Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cinchapi.concourse.server.gossip;
+
+import java.nio.charset.StandardCharsets;
+import java.util.UUID;
+
+import com.cinchapi.common.base.AnyStrings;
+import com.cinchapi.ensemble.gossip.Gossip;
+import com.cinchapi.ensemble.io.Serialization;
+
+/**
+ * {@link StartEngineGossip} is a message that instructs other nodes to
+ * start the engine for the specified environment.
+ *
+ * @author Jeff Nelson
+ */
+public class StartEngineGossip extends Gossip {
+
+ /**
+ * Generate the canonical UUID for the specified {@code environment}.
+ *
+ * @param environment
+ * @return the UUID
+ */
+ private static UUID generateUUID(String environment) {
+ String name = AnyStrings.format("{} - {}",
+ StartEngineGossip.class.getName(), environment);
+ byte[] bytes = name.getBytes(StandardCharsets.UTF_8);
+ return UUID.nameUUIDFromBytes(bytes);
+ }
+
+ /**
+ * The name of the environment for which the engine should be started.
+ */
+ private String environment;
+
+ /**
+ * Construct a new instance.
+ *
+ * @param environment
+ */
+ public StartEngineGossip(String environment) {
+ super(generateUUID(environment));
+ this.environment = environment;
+ }
+
+ /**
+ * Construct a new instance.
+ *
+ * @param id
+ */
+ public StartEngineGossip(UUID id) {
+ super(id);
+ }
+
+ /**
+ * Return the name of the environment for which the engine should be
+ * started.
+ *
+ * @return the environment name
+ */
+ public String environment() {
+ return environment;
+ }
+
+ @Override
+ public String toString() {
+ return "Start Engine " + environment;
+ }
+
+ @Override
+ protected void readFrom(Serialization stream) {
+ environment = stream.readUtf8();
+
+ }
+
+ @Override
+ protected void writeTo(Serialization stream) {
+ stream.writeUtf8(environment);
+ }
+
+}
diff --git a/concourse-server/src/main/java/com/cinchapi/concourse/server/plugin/PluginManager.java b/concourse-server/src/main/java/com/cinchapi/concourse/server/plugin/PluginManager.java
index eeae38142..7c8de6ea7 100644
--- a/concourse-server/src/main/java/com/cinchapi/concourse/server/plugin/PluginManager.java
+++ b/concourse-server/src/main/java/com/cinchapi/concourse/server/plugin/PluginManager.java
@@ -63,8 +63,8 @@
import com.cinchapi.concourse.thrift.AccessToken;
import com.cinchapi.concourse.thrift.ComplexTObject;
import com.cinchapi.concourse.thrift.TransactionToken;
-import com.cinchapi.concourse.time.Time;
import com.cinchapi.concourse.util.ConcurrentMaps;
+import com.cinchapi.concourse.util.Identifiers;
import com.cinchapi.concourse.util.Logger;
import com.cinchapi.concourse.util.MorePaths;
import com.cinchapi.concourse.util.Queues;
@@ -311,7 +311,7 @@ private static String getPluginTempDirectory(String plugin) {
* The session id for the {@link PluginManager}. This is used for grouping
* shared memory files.
*/
- private final static String SESSID = Long.toString(Time.now());
+ private final static String SESSID = Long.toString(Identifiers.next());
/**
* Construct a new instance.
diff --git a/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/AtomicOperation.java b/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/AtomicOperation.java
index daf45a009..2b1f14b4d 100644
--- a/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/AtomicOperation.java
+++ b/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/AtomicOperation.java
@@ -33,6 +33,7 @@
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
+import com.cinchapi.concourse.annotate.DoNotInvoke;
import com.cinchapi.concourse.annotate.Restricted;
import com.cinchapi.concourse.server.concurrent.LockBroker;
import com.cinchapi.concourse.server.concurrent.LockBroker.Permit;
@@ -53,6 +54,14 @@
import com.cinchapi.concourse.time.Time;
import com.cinchapi.concourse.util.Logger;
import com.cinchapi.concourse.util.Transformers;
+import com.cinchapi.ensemble.Broadcast;
+import com.cinchapi.ensemble.EnsembleInstanceIdentifier;
+import com.cinchapi.ensemble.Locator;
+import com.cinchapi.ensemble.Read;
+import com.cinchapi.ensemble.Reduce;
+import com.cinchapi.ensemble.ReturnsEnsemble;
+import com.cinchapi.ensemble.WeakRead;
+import com.cinchapi.ensemble.reduce.SetUnionReducer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
@@ -77,7 +86,8 @@
@NotThreadSafe
public class AtomicOperation extends BufferedStore implements
AtomicSupport,
- TokenEventObserver {
+ TokenEventObserver,
+ Distributed {
// NOTE: This class does not need to do any locking on operations (until
// commit time) because it is assumed to be isolated to one thread and the
@@ -90,11 +100,12 @@ public class AtomicOperation extends BufferedStore implements
*
* @param store
* @param broker
+ * @param id
* @return the AtomicOperation
*/
protected static AtomicOperation start(AtomicSupport store,
- LockBroker broker) {
- return new AtomicOperation(store, broker);
+ LockBroker broker, String id) {
+ return new AtomicOperation(store, broker, id);
}
/**
@@ -122,6 +133,11 @@ protected static AtomicOperation start(AtomicSupport store,
*/
protected final LockBroker broker;
+ /**
+ * The unique identifier.
+ */
+ protected final transient String id;
+
/**
* Whenever a nested {@link AtomicOperation} is
* {@link #startAtomicOperation() started}, it, by virtue of being a
@@ -209,9 +225,12 @@ protected static AtomicOperation start(AtomicSupport store,
* Construct a new instance.
*
* @param destination
+ * @param broker
+ * @param id
*/
- protected AtomicOperation(AtomicSupport destination, LockBroker broker) {
- this(new Queue(INITIAL_CAPACITY), destination, broker);
+ protected AtomicOperation(AtomicSupport destination, LockBroker broker,
+ String id) {
+ this(new Queue(INITIAL_CAPACITY), destination, broker, id);
}
/**
@@ -219,12 +238,13 @@ protected AtomicOperation(AtomicSupport destination, LockBroker broker) {
*
* @param buffer
* @param destination
- * @param lockService
- * @param rangeLockService
+ * @param broker
+ * @param id
*/
protected AtomicOperation(Queue buffer, AtomicSupport destination,
- LockBroker broker) {
+ LockBroker broker, String id) {
super(buffer, destination);
+ this.id = id;
this.broker = broker;
this.source = (AtomicSupport) this.durable;
this.unlocked = new BufferedStore(limbo, durable) {
@@ -244,10 +264,33 @@ protected boolean verifyWithReentrancy(Write write) {
source.subscribe(this);
}
+ /**
+ * No-arg constructor per requirement for Ensemble
+ */
+ @DoNotInvoke
+ AtomicOperation() {
+ super(null, null);
+ this.unlocked = null;
+ this.id = null;
+ this.source = null;
+ this.broker = null;
+ }
+
+ @Override
+ public EnsembleInstanceIdentifier $ensembleInstanceIdentifier() {
+ return EnsembleInstanceIdentifier.of(id);
+ }
+
+ @Override
+ public LockBroker $ensembleLockBroker() {
+ return broker;
+ }
+
/**
* Close this operation and release all of the held locks without applying
* any of the changes to the {@link #source} store.
*/
+ @com.cinchapi.ensemble.Write
public void abort() {
if(status.compareAndSet(Status.OPEN, Status.FINALIZING)
|| status.compareAndSet(Status.PENDING, Status.FINALIZING)
@@ -284,7 +327,8 @@ public void accept(Write write, boolean sync) {
}
@Override
- public final boolean add(String key, TObject value, long record)
+ @com.cinchapi.ensemble.Write
+ public boolean add(@Locator String key, TObject value, @Locator long record)
throws AtomicStateException {
return add(Write.add(key, value, record), Sync.NO, Verify.YES);
}
@@ -293,7 +337,8 @@ public final boolean add(String key, TObject value, long record)
public void announce(TokenEvent event, Token... tokens) {}
@Override
- public final Map> browse(String key)
+ @Read
+ public Map> browse(@Locator String key)
throws AtomicStateException {
checkState();
Text key0 = Text.wrapCached(key);
@@ -307,7 +352,8 @@ public final Map> browse(String key)
}
@Override
- public final Map> browse(String key, long timestamp)
+ @WeakRead
+ public Map> browse(@Locator String key, long timestamp)
throws AtomicStateException {
if(timestamp > Time.now()) {
return browse(key);
@@ -324,8 +370,10 @@ public Map> browseUnlocked(String key) {
}
@Override
- public final Map> chronologize(String key, long record,
- long start, long end) throws AtomicStateException {
+ @WeakRead
+ public Map> chronologize(@Locator String key,
+ @Locator long record, long start, long end)
+ throws AtomicStateException {
checkState();
long now = Time.now();
if(start > now || end > now) {
@@ -376,8 +424,9 @@ public Map> chronologizeUnlocked(String key, long record,
* @return {@code true} if the effects of the operation are completely
* applied
*/
- @VisibleForTesting
- public final boolean commit(long version) throws AtomicStateException {
+ @com.cinchapi.ensemble.Write
+ @Broadcast
+ public boolean commit(long version) throws AtomicStateException {
if(prepare()) {
complete(version);
return true;
@@ -389,7 +438,8 @@ public final boolean commit(long version) throws AtomicStateException {
}
@Override
- public final boolean contains(long record) {
+ @Read
+ public boolean contains(@Locator long record) {
checkState();
Token token = Token.wrap(record);
reads2Lock.add(token);
@@ -398,7 +448,9 @@ public final boolean contains(long record) {
}
@Override
- public Map> explore(String key, Aliases aliases) {
+ @Read
+ public Map> explore(@Locator String key,
+ Aliases aliases) {
checkState();
Operator operator = aliases.operator();
TObject[] values = aliases.values();
@@ -413,7 +465,8 @@ public Map> explore(String key, Aliases aliases) {
}
@Override
- public Map> explore(String key, Aliases aliases,
+ @WeakRead
+ public Map> explore(@Locator String key, Aliases aliases,
long timestamp) {
if(timestamp > Time.now()) {
return explore(key, aliases);
@@ -431,7 +484,8 @@ public Map> exploreUnlocked(String key,
}
@Override
- public final Set gather(String key, long record)
+ @Read
+ public Set gather(@Locator String key, @Locator long record)
throws AtomicStateException {
checkState();
Token token = Token.wrap(key, record);
@@ -440,8 +494,9 @@ public final Set gather(String key, long record)
}
@Override
- public final Set gather(String key, long record, long timestamp)
- throws AtomicStateException {
+ @WeakRead
+ public Set gather(@Locator String key, @Locator long record,
+ long timestamp) throws AtomicStateException {
if(timestamp > Time.now()) {
return gather(key, record);
}
@@ -456,6 +511,14 @@ public Set gatherUnlocked(String key, long record) {
return unlocked.gather(key, record);
}
+ @Override
+ @Read
+ @Broadcast
+ @Reduce(SetUnionReducer.class)
+ public Set getAllRecords() {
+ return super.getAllRecords();
+ }
+
@Override
@Restricted
public boolean observe(TokenEvent event, Token token) {
@@ -507,8 +570,9 @@ public void onCommit(AtomicOperation operation) {
}
@Override
- public final boolean remove(String key, TObject value, long record)
- throws AtomicStateException {
+ @com.cinchapi.ensemble.Write
+ public boolean remove(@Locator String key, TObject value,
+ @Locator long record) throws AtomicStateException {
return remove(Write.remove(key, value, record), Sync.NO, Verify.YES);
}
@@ -516,7 +580,8 @@ public final boolean remove(String key, TObject value, long record)
public final void repair() {/* no-op */}
@Override
- public final Map> review(long record)
+ @WeakRead
+ public Map> review(@Locator long record)
throws AtomicStateException {
checkState();
Token token = Token.wrap(record);
@@ -526,8 +591,9 @@ public final Map> review(long record)
}
@Override
- public final Map> review(String key, long record)
- throws AtomicStateException {
+ @WeakRead
+ public Map> review(@Locator String key,
+ @Locator long record) throws AtomicStateException {
checkState();
Token token = Token.wrap(key, record);
reads2Lock.add(token);
@@ -545,14 +611,16 @@ public Map> reviewUnlocked(String key, long record) {
}
@Override
- public final Set search(String key, String query)
+ @Read
+ public Set search(@Locator String key, String query)
throws AtomicStateException {
checkState();
return super.search(key, query);
}
@Override
- public final Map> select(long record)
+ @Read
+ public Map> select(@Locator long record)
throws AtomicStateException {
checkState();
Token token = Token.wrap(record);
@@ -562,8 +630,9 @@ public final Map> select(long record)
}
@Override
- public final Map> select(long record, long timestamp)
- throws AtomicStateException {
+ @WeakRead
+ public Map> select(@Locator long record,
+ long timestamp) throws AtomicStateException {
if(timestamp > Time.now()) {
return select(record);
}
@@ -574,7 +643,8 @@ public final Map> select(long record, long timestamp)
}
@Override
- public final Set select(String key, long record)
+ @Read
+ public Set select(@Locator String key, @Locator long record)
throws AtomicStateException {
checkState();
Token token = Token.wrap(key, record);
@@ -583,8 +653,9 @@ public final Set select(String key, long record)
}
@Override
- public final Set select(String key, long record, long timestamp)
- throws AtomicStateException {
+ @WeakRead
+ public Set select(@Locator String key, @Locator long record,
+ long timestamp) throws AtomicStateException {
if(timestamp > Time.now()) {
return select(key, record);
}
@@ -605,7 +676,8 @@ public Set selectUnlocked(String key, long record) {
}
@Override
- public final void set(String key, TObject value, long record)
+ @com.cinchapi.ensemble.Write
+ public void set(@Locator String key, TObject value, @Locator long record)
throws AtomicStateException {
checkState();
Token token = Token.wrap(key, record);
@@ -631,7 +703,10 @@ public final void set(String key, TObject value, long record)
public final void start() {}
@Override
- public AtomicOperation startAtomicOperation() {
+ @com.cinchapi.ensemble.Write
+ @Broadcast
+ @ReturnsEnsemble
+ public AtomicOperation startAtomicOperation(String id) {
checkState();
/*
* This operation must adhere to the JIT locking guarantees of its
@@ -640,7 +715,7 @@ public AtomicOperation startAtomicOperation() {
* child until it is ready to commit. As a result, we do not pass the
* #source's lock broker to the nested operation.
*/
- return AtomicOperation.start(this, LockBroker.noOp());
+ return AtomicOperation.start(this, LockBroker.noOp(), id);
}
/**
@@ -686,8 +761,9 @@ public void unsubscribe(TokenEventObserver observer) {
}
@Override
- public final boolean verify(String key, TObject value, long record,
- long timestamp) throws AtomicStateException {
+ @WeakRead
+ public boolean verify(@Locator String key, TObject value,
+ @Locator long record, long timestamp) throws AtomicStateException {
if(timestamp > Time.now()) {
return verify(key, value, record);
}
@@ -698,7 +774,8 @@ public final boolean verify(String key, TObject value, long record,
}
@Override
- public final boolean verify(Write write) throws AtomicStateException {
+ @Read
+ public boolean verify(@Locator Write write) throws AtomicStateException {
checkState();
Token token = Token.wrap(write.getKey().toString(),
write.getRecord().longValue());
@@ -860,6 +937,18 @@ protected void apply(boolean syncAndVerify) {
}
}
+ /**
+ * Cancel the operation and set its status to {@link Status#ABORTED},
+ * regardless of its current state.
+ */
+ protected final void cancel() {
+ source.unsubscribe(this);
+ if(locks != null && !locks.isEmpty()) {
+ releaseLocks();
+ }
+ status.set(Status.ABORTED);
+ }
+
/**
* Check if this operation is preempted by any {@link #queued} version
* change announcements.
@@ -896,6 +985,34 @@ protected void checkState() throws AtomicStateException {
checkIfQueuedPreempted();
}
+ /**
+ * The second phase of the {@link #commit(long) commit} protocol: apply the
+ * effects of this operation to the {@link #source}.
+ *
+ * This method requires that the operation have been {@link #prepare()
+ * prepared} so that it is guaranteed that the effects can be applied
+ * without conflict.
+ *
+ *
+ * @param version the {@link Versioned#getVersion() version} to apply to all
+ * the writes in this {@link AtomicOperation}
+ */
+ protected void complete(long version) {
+ if(status.compareAndSet(Status.FINALIZING, Status.FINALIZING)) {
+ limbo.transform(write -> write.rewrite(version));
+ apply();
+ releaseLocks();
+ source.onCommit(this);
+ if(!status.compareAndSet(Status.FINALIZING, Status.COMMITTED)) {
+ throw new IllegalStateException(
+ "Unexpected atomic operation state change");
+ }
+ }
+ else {
+ throwAtomicStateException();
+ }
+ }
+
/**
* Return {@code true} if {@code event} for {@code token} preempts this
* {@link AtomicOperation operation}.
@@ -950,6 +1067,28 @@ protected boolean isReadOnly() {
return ((Queue) limbo).size() == 0;
}
+ /**
+ * Release all of the locks that are held by this operation.
+ */
+ protected void releaseLocks() {
+ if(isReadOnly()) {
+ return;
+ }
+ else if(locks != null) {
+ Set _locks = locks;
+ locks = null; // CON-172: Set the reference of the locks to null
+ // immediately to prevent a race condition where
+ // the #grabLocks method isn't notified of version
+ // change failure in time
+ for (LockDescription lock : _locks) {
+ lock.unlock(); // We should never encounter an
+ // IllegalMonitorStateException here because a
+ // lock should only go in #locks once it has been
+ // locked.
+ }
+ }
+ }
+
@Override
protected final boolean remove(Write write, Sync sync, Verify verify)
throws AtomicStateException {
@@ -1120,46 +1259,6 @@ private boolean acquireLocks() {
}
}
- /**
- * Cancel the operation and set its status to {@link Status#ABORTED},
- * regardless of its current state.
- */
- private final void cancel() {
- source.unsubscribe(this);
- if(locks != null && !locks.isEmpty()) {
- releaseLocks();
- }
- status.set(Status.ABORTED);
- }
-
- /**
- * The second phase of the {@link #commit(long) commit} protocol: apply the
- * effects of this operation to the {@link #source}.
- *
- * This method requires that the operation have been {@link #prepare()
- * prepared} so that it is guaranteed that the effects can be applied
- * without conflict.
- *
- *
- * @param version the {@link Versioned#getVersion() version} to apply to all
- * the writes in this {@link AtomicOperation}
- */
- private final void complete(long version) {
- if(status.compareAndSet(Status.FINALIZING, Status.FINALIZING)) {
- limbo.transform(write -> write.rewrite(version));
- apply();
- releaseLocks();
- source.onCommit(this);
- if(!status.compareAndSet(Status.FINALIZING, Status.COMMITTED)) {
- throw new IllegalStateException(
- "Unexpected atomic operation state change");
- }
- }
- else {
- throwAtomicStateException();
- }
- }
-
/**
* Return {@code true} if it can immediately be determined that
* {@code event} for {@code token} preempts this {@link AtomicOperation
@@ -1249,28 +1348,6 @@ private final boolean prepare() {
return false;
}
- /**
- * Release all of the locks that are held by this operation.
- */
- private void releaseLocks() {
- if(isReadOnly()) {
- return;
- }
- else if(locks != null) {
- Set _locks = locks;
- locks = null; // CON-172: Set the reference of the locks to null
- // immediately to prevent a race condition where
- // the #grabLocks method isn't notified of version
- // change failure in time
- for (LockDescription lock : _locks) {
- lock.unlock(); // We should never encounter an
- // IllegalMonitorStateException here because a
- // lock should only go in #locks once it has been
- // locked.
- }
- }
- }
-
/**
* A LockDescription is a wrapper around a {@link Lock} that contains
* metadata that can be serialized to disk. The AtomicOperation grabs a
@@ -1548,4 +1625,4 @@ public void remove(Text key, Range range) {
}
-}
+}
\ No newline at end of file
diff --git a/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/AtomicSupport.java b/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/AtomicSupport.java
index 4c9cd113d..794532a2d 100644
--- a/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/AtomicSupport.java
+++ b/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/AtomicSupport.java
@@ -15,6 +15,8 @@
*/
package com.cinchapi.concourse.server.storage;
+import java.util.UUID;
+
/**
* A {@link LockFreeStore} that can initiate and therefore serve as the
* destination for an {@link AtomicOperation}.
@@ -41,7 +43,20 @@ public interface AtomicSupport extends
*
* @return the {@link AtomicOperation}
*/
- public AtomicOperation startAtomicOperation();
+ public default AtomicOperation startAtomicOperation() {
+ return startAtomicOperation(UUID.randomUUID().toString());
+ }
+
+ /**
+ * Return an {@link AtomicOperation} that can be used to group actions that
+ * should all succeed or fail together. Use {@link AtomicOperation#commit()}
+ * to apply the action to this store or use {@link AtomicOperation#abort()}
+ * to cancel.
+ *
+ * @param id
+ * @return the {@link AtomoicOperation}
+ */
+ public AtomicOperation startAtomicOperation(String id);
/**
* Perform any additional cleanup that should happen after successfully
diff --git a/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/BufferedStore.java b/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/BufferedStore.java
index d252799bb..0ca2e4c68 100644
--- a/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/BufferedStore.java
+++ b/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/BufferedStore.java
@@ -15,6 +15,7 @@
*/
package com.cinchapi.concourse.server.storage;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -313,9 +314,20 @@ public Map> review(String key, long record) {
@Override
public Set search(String key, String query) {
- // FIXME: should this be implemented using a context instead?
- return Sets.symmetricDifference(limbo.search(key, query),
- durable.search(key, query));
+ Set context = durable.search(key, query);
+ Set latest = limbo.search(key, query);
+ Set xor = new LinkedHashSet<>();
+ for (long record : context) {
+ if(!latest.contains(record)) {
+ xor.add(record);
+ }
+ }
+ for (long record : latest) {
+ if(!context.contains(record)) {
+ xor.add(record);
+ }
+ }
+ return xor;
}
@Override
diff --git a/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/Distributed.java b/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/Distributed.java
new file mode 100644
index 000000000..0532ec6ef
--- /dev/null
+++ b/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/Distributed.java
@@ -0,0 +1,129 @@
+/*
+ * Copyright (c) 2013-2025 Cinchapi Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cinchapi.concourse.server.storage;
+
+import java.util.Map;
+
+import com.cinchapi.concourse.server.concurrent.LockBroker;
+import com.cinchapi.concourse.server.storage.AtomicOperation.Status;
+import com.cinchapi.concourse.time.TimeSource;
+import com.cinchapi.ensemble.Ensemble;
+import com.cinchapi.ensemble.EnsembleInstanceIdentifier;
+import com.google.common.collect.ImmutableMap;
+
+/**
+ * A {@link Store} that is distributed through the {@link Ensemble} framework.
+ *
+ * This interface provides default adherence to {@link Ensemble Ensemble's}
+ * two-phase commit protocol via the {@link TwoPhaseCommit} construct. This
+ * interface automatically manages the registration and lifecycle of
+ * {@link TwoPhaseCommit} instances so that they can be operated on by
+ * individual nodes within an {@link Ensemble} cluster.
+ *
+ *
+ * @author Jeff Nelson
+ */
+public interface Distributed extends AtomicSupport, Ensemble {
+
+ @Override
+ public default void $ensembleAbortAtomic(
+ EnsembleInstanceIdentifier identifier) {
+ TwoPhaseCommit atomic = TwoPhaseCommit.allocator().get(this,
+ identifier);
+ try {
+ atomic.abort();
+ }
+ finally {
+ atomic.deallocate();
+ }
+ }
+
+ @Override
+ public default void $ensembleFinishCommitAtomic(
+ EnsembleInstanceIdentifier identifier) {
+ TwoPhaseCommit atomic = TwoPhaseCommit.allocator().get(this,
+ identifier);
+ try {
+ atomic.finish();
+ }
+ finally {
+ atomic.deallocate();
+ }
+ }
+
+ /**
+ * Mapping of {@link AtomicOperation} methods that must be intercepted when
+ * a {@link TwoPhaseCommit} is being used to coordinate atomic state change
+ * operations against a distributed atomic operation.
+ */
+ static Map localAtomicOperationIntercepts = ImmutableMap
+ .of("commit", "$commit", "abort", "$abort");
+
+ @Override
+ default T $ensembleInvokeAtomic(EnsembleInstanceIdentifier identifier,
+ String method, Object... args) {
+ String intercept = localAtomicOperationIntercepts.get(method);
+ if(intercept != null) {
+ if(TwoPhaseCommit.allocator().intercept(this, identifier)) {
+ method = intercept;
+ }
+ else {
+ throw new IllegalStateException(method
+ + " is marked for interception "
+ + "during local atomic operations, but interception failed");
+ }
+ }
+ return Ensemble.super.$ensembleInvokeAtomic(identifier, method, args);
+ }
+
+ /**
+ * Return the {@link LockBroker} that should be used for distributed
+ * operations.
+ *
+ * @return the {@link LockBroker} instance
+ */
+ public LockBroker $ensembleLockBroker();
+
+ @Override
+ public default boolean $ensemblePrepareCommitAtomic(
+ EnsembleInstanceIdentifier identifier, long timestamp) {
+ TwoPhaseCommit atomic = TwoPhaseCommit.allocator().get(this,
+ identifier);
+ if(atomic.status() == Status.FINALIZING) {
+ // In cases where an Ensemble Tandem includes multiple Cohorts with
+ // the same leader node, the #2pc will be committed more than once.
+ return true;
+ }
+ else {
+ timestamp = TimeSource.get().interpret(timestamp);
+ return atomic.commit(timestamp);
+ }
+ }
+
+ @Override
+ public default void $ensembleStartAtomic(
+ EnsembleInstanceIdentifier identifier) {
+ TwoPhaseCommit.allocator().allocate(identifier, this,
+ $ensembleLockBroker());
+ }
+
+ @Override
+ default Ensemble $ensembleLocateAtomicInstance(
+ EnsembleInstanceIdentifier identifier) {
+ return TwoPhaseCommit.allocator().get(this, identifier);
+ }
+
+}
diff --git a/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/Engine.java b/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/Engine.java
index 4e3d520c9..334985a19 100644
--- a/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/Engine.java
+++ b/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/Engine.java
@@ -60,6 +60,14 @@
import com.cinchapi.concourse.thrift.TObject.Aliases;
import com.cinchapi.concourse.util.Logger;
import com.cinchapi.concourse.util.Transformers;
+import com.cinchapi.ensemble.Broadcast;
+import com.cinchapi.ensemble.EnsembleInstanceIdentifier;
+import com.cinchapi.ensemble.Locator;
+import com.cinchapi.ensemble.Read;
+import com.cinchapi.ensemble.Reduce;
+import com.cinchapi.ensemble.ReturnsEnsemble;
+import com.cinchapi.ensemble.WeakRead;
+import com.cinchapi.ensemble.reduce.SetUnionReducer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
@@ -78,9 +86,10 @@
* @author Jeff Nelson
*/
@ThreadSafe
-public final class Engine extends BufferedStore implements
+public class Engine extends BufferedStore implements
TransactionSupport,
- AtomicSupport {
+ AtomicSupport,
+ Distributed {
//
// NOTES ON LOCKING:
@@ -189,7 +198,7 @@ private static ReentrantReadWriteLock createTransportLock() {
* {@link Database} in the default locations.
*
*/
- public Engine() {
+ Engine() {
this(new Buffer(), new Database(), GlobalState.DEFAULT_ENVIRONMENT);
}
@@ -242,6 +251,16 @@ private Engine(Buffer buffer, Database database, String environment) {
database.tag(environment);
}
+ @Override
+ public EnsembleInstanceIdentifier $ensembleInstanceIdentifier() {
+ return EnsembleInstanceIdentifier.of(environment);
+ }
+
+ @Override
+ public LockBroker $ensembleLockBroker() {
+ return broker;
+ }
+
@Override
@DoNotInvoke
public void accept(Write write) {
@@ -301,7 +320,9 @@ long record = write.getRecord().longValue();
}
@Override
- public boolean add(String key, TObject value, long record) {
+ @com.cinchapi.ensemble.Write
+ public boolean add(@Locator String key, TObject value,
+ @Locator long record) {
transportLock.readLock().lock();
Token sharedToken = Token.shareable(record);
Token writeToken = Token.wrap(key, record);
@@ -361,7 +382,8 @@ public void announce(TokenEvent event, Token... tokens) {
}
@Override
- public Map> browse(String key) {
+ @Read
+ public Map> browse(@Locator String key) {
transportLock.readLock().lock();
RangeToken token = RangeToken.forReading(Text.wrapCached(key),
Operator.BETWEEN, Value.NEGATIVE_INFINITY,
@@ -377,7 +399,8 @@ public Map> browse(String key) {
}
@Override
- public Map> browse(String key, long timestamp) {
+ @WeakRead
+ public Map> browse(@Locator String key, long timestamp) {
transportLock.readLock().lock();
try {
return super.browse(key, timestamp);
@@ -399,8 +422,9 @@ public Map> browseUnlocked(String key) {
}
@Override
- public Map> chronologize(String key, long record,
- long start, long end) {
+ @WeakRead
+ public Map> chronologize(@Locator String key,
+ @Locator long record, long start, long end) {
transportLock.readLock().lock();
Token token = Token.wrap(record);
Permit read = broker.readLock(token);
@@ -433,7 +457,8 @@ public void compact() {
}
@Override
- public boolean contains(long record) {
+ @Read
+ public boolean contains(@Locator long record) {
return inventory.contains(record);
}
@@ -452,7 +477,9 @@ public String dump(String id) {
}
@Override
- public Map> explore(String key, Aliases aliases) {
+ @Read
+ public Map> explore(@Locator String key,
+ Aliases aliases) {
transportLock.readLock().lock();
RangeToken token = RangeToken.forReading(Text.wrapCached(key),
aliases.operator(), Transformers.transformArray(
@@ -468,7 +495,8 @@ public Map> explore(String key, Aliases aliases) {
}
@Override
- public Map> explore(String key, Aliases aliases,
+ @WeakRead
+ public Map> explore(@Locator String key, Aliases aliases,
long timestamp) {
transportLock.readLock().lock();
try {
@@ -492,7 +520,8 @@ public Map> exploreUnlocked(String key,
}
@Override
- public Set gather(String key, long record) {
+ @Read
+ public Set gather(@Locator String key, @Locator long record) {
transportLock.readLock().lock();
Token token = Token.wrap(key, record);
Permit read = broker.readLock(token);
@@ -506,7 +535,9 @@ public Set gather(String key, long record) {
}
@Override
- public Set gather(String key, long record, long timestamp) {
+ @WeakRead
+ public Set gather(@Locator String key, @Locator long record,
+ long timestamp) {
transportLock.readLock().lock();
try {
return super.gather(key, record, timestamp);
@@ -528,6 +559,9 @@ public Set gatherUnlocked(String key, long record) {
}
@Override
+ @Read
+ @Broadcast
+ @Reduce(SetUnionReducer.class)
public Set getAllRecords() {
return inventory.getAll();
}
@@ -553,7 +587,9 @@ public String getDumpList() {
}
@Override
- public boolean remove(String key, TObject value, long record) {
+ @com.cinchapi.ensemble.Write
+ public boolean remove(@Locator String key, TObject value,
+ @Locator long record) {
transportLock.readLock().lock();
Token sharedToken = Token.shareable(record);
Token writeToken = Token.wrap(key, record);
@@ -619,7 +655,8 @@ long record = it.next().getRecord().longValue();
}
@Override
- public Map> review(long record) {
+ @WeakRead
+ public Map> review(@Locator long record) {
transportLock.readLock().lock();
Token token = Token.shareable(record);
Permit read = broker.readLock(token);
@@ -633,7 +670,9 @@ public Map> review(long record) {
}
@Override
- public Map> review(String key, long record) {
+ @WeakRead
+ public Map> review(@Locator String key,
+ @Locator long record) {
transportLock.readLock().lock();
Token token = Token.wrap(key, record);
Permit read = broker.readLock(token);
@@ -669,7 +708,8 @@ public Map> reviewUnlocked(String key, long record) {
}
@Override
- public Set search(String key, String query) {
+ @Read
+ public Set search(@Locator String key, String query) {
// NOTE: Range locking for a search query requires too much overhead, so
// we must be willing to live with the fact that a search query may
// provide inconsistent results if a match is added while the read is
@@ -684,7 +724,8 @@ public Set search(String key, String query) {
}
@Override
- public Map> select(long record) {
+ @Read
+ public Map> select(@Locator long record) {
transportLock.readLock().lock();
Token token = Token.shareable(record);
Permit read = broker.readLock(token);
@@ -698,7 +739,9 @@ public Map> select(long record) {
}
@Override
- public Map> select(long record, long timestamp) {
+ @WeakRead
+ public Map> select(@Locator long record,
+ long timestamp) {
transportLock.readLock().lock();
try {
return super.select(record, timestamp);
@@ -709,7 +752,8 @@ public Map> select(long record, long timestamp) {
}
@Override
- public Set select(String key, long record) {
+ @Read
+ public Set select(@Locator String key, @Locator long record) {
transportLock.readLock().lock();
Token token = Token.wrap(key, record);
Permit read = broker.readLock(token);
@@ -723,7 +767,9 @@ public Set select(String key, long record) {
}
@Override
- public Set select(String key, long record, long timestamp) {
+ @WeakRead
+ public Set select(@Locator String key, @Locator long record,
+ long timestamp) {
transportLock.readLock().lock();
try {
return super.select(key, record, timestamp);
@@ -756,7 +802,8 @@ public Set selectUnlocked(String key, long record) {
}
@Override
- public void set(String key, TObject value, long record) {
+ @com.cinchapi.ensemble.Write
+ public void set(@Locator String key, TObject value, @Locator long record) {
transportLock.readLock().lock();
Token sharedToken = Token.shareable(record);
Token writeToken = Token.wrap(key, record);
@@ -794,13 +841,17 @@ public void start() {
}
@Override
- public AtomicOperation startAtomicOperation() {
- return AtomicOperation.start(this, broker);
+ @ReturnsEnsemble
+ public AtomicOperation startAtomicOperation(String id) {
+ return AtomicOperation.start(this, broker, id);
}
@Override
- public Transaction startTransaction() {
- return Transaction.start(this);
+ @com.cinchapi.ensemble.Write
+ @Broadcast
+ @ReturnsEnsemble
+ public Transaction startTransaction(String id) {
+ return Transaction.start(this, id);
}
@Override
@@ -834,7 +885,8 @@ public void unsubscribe(TokenEventObserver observer) {
}
@Override
- public boolean verify(Write write) {
+ @Read
+ public boolean verify(@Locator Write write) {
transportLock.readLock().lock();
Token token = Token.wrap(write.getKey().toString(),
write.getRecord().longValue());
@@ -849,7 +901,8 @@ public boolean verify(Write write) {
}
@Override
- public boolean verify(Write write, long timestamp) {
+ @WeakRead
+ public boolean verify(@Locator Write write, long timestamp) {
transportLock.readLock().lock();
try {
return super.verify(write, timestamp);
@@ -981,4 +1034,5 @@ private boolean removeUnlocked(Write write, Sync sync) {
verify == Verify.YES);
}
}
+
}
diff --git a/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/Transaction.java b/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/Transaction.java
index 18e4aa7e0..58f729d14 100644
--- a/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/Transaction.java
+++ b/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/Transaction.java
@@ -27,13 +27,16 @@
import com.cinchapi.common.base.CheckedExceptions;
import com.cinchapi.common.io.ByteBuffers;
+import com.cinchapi.concourse.annotate.DoNotInvoke;
+import com.cinchapi.concourse.server.concurrent.LockBroker;
import com.cinchapi.concourse.server.io.ByteableCollections;
import com.cinchapi.concourse.server.io.FileSystem;
import com.cinchapi.concourse.server.storage.temp.Queue;
import com.cinchapi.concourse.server.storage.temp.ToggleQueue;
import com.cinchapi.concourse.server.storage.temp.Write;
-import com.cinchapi.concourse.time.Time;
+import com.cinchapi.concourse.util.Identifiers;
import com.cinchapi.concourse.util.Logger;
+import com.google.common.annotations.VisibleForTesting;
/**
* An {@link AtomicOperation} that performs backups prior to commit to make sure
@@ -46,7 +49,7 @@
* @author Jeff Nelson
*/
@NotThreadSafe
-public final class Transaction extends AtomicOperation {
+public class Transaction extends AtomicOperation implements Distributed {
/**
* Return the Transaction for {@code destination} that is backed up to
@@ -61,7 +64,7 @@ public static void recover(Engine destination, String file) {
try {
ByteBuffer bytes = FileSystem.map(file, MapMode.READ_ONLY, 0,
FileSystem.getFileSize(file));
- Transaction transaction = new Transaction(destination, bytes);
+ Transaction transaction = new Transaction(destination, bytes, file);
transaction.invokeSuperApply(true); // recovering transaction
// must always syncAndVerify
// to prevent possible data
@@ -82,30 +85,34 @@ public static void recover(Engine destination, String file) {
}
/**
- * Return a new Transaction with {@code engine} as the eventual destination.
+ * Return a new {@link Transaction} with {@code engine} as the eventual
+ * destination.
*
* @param engine
- * @return the new Transaction
+ * @param id
+ * @return the new {@link Transaction}
*/
- public static Transaction start(Engine engine) {
- return new Transaction(engine);
+ public static Transaction start(Engine engine, String id) {
+ return new Transaction(engine, id);
}
/**
- * The unique Transaction id.
+ * Return a new {@link Transaction} with {@code engine} as the eventual
+ * destination.
+ *
+ * @param engine
+ * @return the new {@link Transaction}
*/
- private final String id;
+ @VisibleForTesting
+ static Transaction start(Engine engine) {
+ return start(engine, Long.toString(Identifiers.next()));
+ }
/**
* Construct a new instance.
- *
- * @param destination
*/
- private Transaction(Engine destination) {
- super(new ToggleQueue(INITIAL_CAPACITY), destination,
- destination.broker);
- this.id = Long.toString(Time.now());
- }
+ @DoNotInvoke
+ Transaction() {}
/**
* Construct a new instance.
@@ -113,13 +120,29 @@ private Transaction(Engine destination) {
* @param destination
* @param bytes
*/
- private Transaction(Engine destination, ByteBuffer bytes) {
- this(destination);
+ private Transaction(Engine destination, ByteBuffer bytes, String id) {
+ this(destination, id);
deserialize(bytes);
setStatus(Status.COMMITTED);
}
+ /**
+ * Construct a new instance.
+ *
+ * @param destination
+ * @param id
+ */
+ private Transaction(Engine destination, String id) {
+ super(new ToggleQueue(INITIAL_CAPACITY), destination,
+ destination.broker, id);
+ }
+
+ @Override
+ public LockBroker $ensembleLockBroker() {
+ return ((Engine) durable).broker;
+ }
+
@Override
public void abort() {
super.abort();
diff --git a/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/TransactionSupport.java b/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/TransactionSupport.java
index c1f821435..7a1db8fe2 100644
--- a/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/TransactionSupport.java
+++ b/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/TransactionSupport.java
@@ -15,6 +15,8 @@
*/
package com.cinchapi.concourse.server.storage;
+import com.cinchapi.concourse.util.Identifiers;
+
/**
* A store that can initiate and therefore serve as the destination for a
* {@link Transaction}.
@@ -28,6 +30,16 @@ public interface TransactionSupport {
*
* @return the Transaction
*/
- public Transaction startTransaction();
+ public default Transaction startTransaction() {
+ return startTransaction(Long.toString(Identifiers.next()));
+ }
+
+ /**
+ * Start a new {@link Transaction}.
+ *
+ * @param id
+ * @return the Transaction
+ */
+ public Transaction startTransaction(String id);
}
diff --git a/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/TwoPhaseCommit.java b/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/TwoPhaseCommit.java
new file mode 100644
index 000000000..ea6bc2446
--- /dev/null
+++ b/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/TwoPhaseCommit.java
@@ -0,0 +1,451 @@
+/*
+ * Copyright (c) 2013-2025 Cinchapi Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cinchapi.concourse.server.storage;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.annotation.Nullable;
+
+import com.cinchapi.common.reflect.Reflection;
+import com.cinchapi.concourse.server.concurrent.LockBroker;
+import com.cinchapi.concourse.util.Logger;
+import com.cinchapi.ensemble.Ensemble;
+import com.cinchapi.ensemble.EnsembleInstanceIdentifier;
+
+/**
+ * An {@link AtomicOperation} that implements the
+ * Two-phase
+ * Commit Protocol for distributed operations.
+ *
+ * This class provides a sophisticated mechanism for coordinating distributed
+ * atomic operations across multiple nodes in an {@link Ensemble} cluster. The
+ * key aspects of this implementation are:
+ *
+ *
+ * - Two-Phase Nature: Unlike a standard {@link AtomicOperation}, this
+ * does not apply operations when it is {@link #commit() committed}. Instead it
+ * only grabs locks. To apply the operations, it must be explicitly
+ * {@link #finish() finished} (in adherence with its two-phase nature).
+ * - Resource Blocking: Since there is no "rollback" for committed
+ * {@link AtomicOperation atomic operations}, this class provides data
+ * consistency guarantees to participants in a distributed operation by
+ * indefinitely blocking resources in the event that the coordinator or a
+ * participant is unable to mark a committed transaction as "finished" (in lieu
+ * of instructing participants to rollback changes).
+ * - Method Interception: The class uses a sophisticated method
+ * interception scheme that is only activated when the destination store is an
+ * {@link AtomicOperation}. This is necessary because:
+ *
+ * - {@link TwoPhaseCommit} extends {@link AtomicOperation} and uses its
+ * built-in lifecycle methods (commit, abort, etc.) to coordinate the
+ * distributed protocol
+ * - When the destination store is also an {@link AtomicOperation}, these
+ * same lifecycle methods must be intercepted to ensure proper coordination of
+ * both the distributed protocol and the local atomic operation
+ * - The interception allows the coordinator's calls to control the
+ * distributed operation lifecycle to be properly routed to both the
+ * distributed coordination and the local atomic operation
+ *
+ * If the destination store is not an {@link AtomicOperation}, no interception
+ * is needed as there is no local atomicity to preserve beyond the distributed
+ * coordination.
+ *
+ *
+ * The coordination flow works as follows:
+ *
+ *
+ * - When a distributed operation begins, the coordinator creates a
+ * {@link TwoPhaseCommit} instance via the {@link Allocator}.
+ * - The coordinator then initiates the two-phase commit protocol by calling
+ * {@link #commit()} on each participant's {@link TwoPhaseCommit} instance.
+ * - During the commit phase:
+ *
+ * - If the destination store is an {@link AtomicOperation}, method calls are
+ * intercepted to ensure proper coordination of both the distributed protocol
+ * and the local atomic operation. This is necessary because the same lifecycle
+ * methods are used for both purposes.
+ * - If the destination store is not an {@link AtomicOperation}, no
+ * interception is needed as the distributed coordination is sufficient.
+ *
+ *
+ * - Once all participants have successfully committed, the coordinator calls
+ * {@link #finish()} on each participant's {@link TwoPhaseCommit} instance to
+ * complete the distributed operation.
+ * - If any participant fails during the commit phase, the coordinator can
+ * call {@link #abort()} to cancel the distributed operation.
+ *
+ *
+ * This construct is necessary to support distributed atomic operations using
+ * the {@link Ensemble} framework and ensures proper coordination of both
+ * distributed operations and, when needed, local atomic operations.
+ *
+ *
+ * @author Jeff Nelson
+ */
+class TwoPhaseCommit extends AtomicOperation {
+
+ /**
+ * Return the {@link Allocator} that manages {@link TwoPhaseCommit}
+ * instances.
+ *
+ * @return the {@link Allocator}
+ */
+ public static Allocator allocator() {
+ return ALLOCATOR;
+ }
+
+ /**
+ * The canonical {@link #allocator()}.
+ */
+ private static final Allocator ALLOCATOR = new Allocator();
+
+ /**
+ * The {@link EnsembleInstanceIdentifier} that is assigned when
+ * {@link Ensemble#$ensembleStartAtomic(EnsembleInstanceIdentifier)} is
+ * called. This identifier is used to track the distributed operation across
+ * all participating nodes.
+ */
+ private final EnsembleInstanceIdentifier identifier;
+
+ /**
+ * The version to assign when {@link #finish() finishing the commit}. This
+ * version is used to ensure consistency across all participating nodes in
+ * the distributed operation.
+ */
+ protected Long version = null;
+
+ /**
+ * Construct a new instance.
+ *
+ * @param identifier the unique identifier for this commit, used to track
+ * the distributed operation across all participating nodes
+ * @param destination the destination for atomic operations, typically a
+ * {@link Store} instance
+ * @param broker the lock broker to use for coordinating access to shared
+ * resources
+ */
+ TwoPhaseCommit(EnsembleInstanceIdentifier identifier,
+ AtomicSupport destination, LockBroker broker) {
+ super(destination, broker, null);
+ this.identifier = identifier;
+ }
+
+ @Override
+ public void $ensembleAbortAtomic(EnsembleInstanceIdentifier identifier) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void $ensembleFinishCommitAtomic(
+ EnsembleInstanceIdentifier identifier) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public EnsembleInstanceIdentifier $ensembleInstanceIdentifier() {
+ return identifier;
+ }
+
+ @Override
+ public T $ensembleInvokeAtomic(EnsembleInstanceIdentifier identifier,
+ String method, Object... args) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Ensemble $ensembleLocateAtomicInstance(
+ EnsembleInstanceIdentifier identifier) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean $ensemblePrepareCommitAtomic(
+ EnsembleInstanceIdentifier identifier, long timestamp) {
+ return false;
+ }
+
+ @Override
+ public void $ensembleStartAtomic(EnsembleInstanceIdentifier identifier) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void abort() {
+ super.cancel();
+ Logger.debug("Canceled two phase commit {}", this);
+ }
+
+ /**
+ * Finish the {@link #commit()} and release all the locks that were grabbed.
+ * This method is called by the coordinator after all participants have
+ * successfully committed their changes. It ensures that the distributed
+ * operation is completed atomically across all participating nodes.
+ *
+ * @throws AtomicStateException if the commit has not been prepared or if
+ * the distributed operation is in an invalid state
+ */
+ public void finish() {
+ if(version != null) {
+ super.complete(version);
+ Logger.debug("Finished two phase commit {}", this);
+ }
+ else {
+ throwAtomicStateException();
+ }
+ }
+
+ @Override
+ protected void complete(long version) {
+ this.version = version;
+ // Don't actually perform the completion work. This ensures that the
+ // only thing that happens when #commit() is called is that the locks
+ // are acquired. The actual completion work happens when #finish() is
+ // called.
+ Logger.debug("Completed two phase commit {} at version {}", this,
+ version);
+ }
+
+ /**
+ * Deallocate this {@link TwoPhaseCommit}. This method is called when the
+ * distributed operation is complete and the resources used by this commit
+ * can be released.
+ *
+ * @return {@code true} if this is deallocated successfully
+ */
+ boolean deallocate() {
+ return allocator().deallocate((AtomicSupport) durable, identifier);
+ }
+
+ /**
+ * Create an interceptor for this {@link TwoPhaseCommit} to handle method
+ * interception during distributed atomic operations. The interceptor is
+ * only created when the destination store is an {@link AtomicOperation},
+ * as it is needed to coordinate both the distributed protocol and the local
+ * atomic operation using the same lifecycle methods.
+ *
+ * @return a new {@link TwoPhaseCommitInterceptor} that will handle method
+ * interception for this commit
+ */
+ private TwoPhaseCommit intercept() {
+ return new TwoPhaseCommitInterceptor(this);
+ }
+
+ /**
+ * Check if this {@link TwoPhaseCommit} is being used to coordinate a
+ * distributed {@link AtomicOperation}. This is used to determine if method
+ * interception is necessary to coordinate both the distributed protocol and
+ * the local atomic operation.
+ *
+ * @return {@code true} if this is coordinating a distributed operation
+ * where the destination store is an {@link AtomicOperation} and
+ * method interception is required
+ */
+ private boolean isLocalAtomicOperation() {
+ return this.durable instanceof AtomicOperation
+ && !(this.durable instanceof TwoPhaseCommit);
+ }
+
+ /**
+ * Provides methods to control the lifecycle of {@link TwoPhaseCommit}
+ * instances.
+ *
+ * @author Jeff Nelson
+ */
+ static class Allocator {
+
+ /**
+ * The {@link TwoPhaseCommit TwoPhaseCommits} that are currently
+ * {@link #allocate(EnsembleInstanceIdentifier, AtomicSupport, LockBroker)
+ * allocated}. This map is used to track all active distributed
+ * operations and ensure proper coordination across all participating
+ * nodes.
+ *
+ * @implNote It is assumed that assigned EnsembleInstanceIdentifiers are
+ * unique across destinations, so those are not maintained
+ * within the map.
+ */
+ private final Map commits = new ConcurrentHashMap<>();
+
+ /**
+ * Construct a new instance.
+ */
+ private Allocator() { /* no init */}
+
+ /**
+ * Allocate a new {@link TwoPhaseCommit} that is
+ * {@link #get(AtomicSupport, EnsembleInstanceIdentifier) retrievable}
+ * with {@code identifier} and acts as a sandbox for the
+ * {@code destination} using the {@code broker}. This method is called
+ * by the coordinator when initiating a distributed operation.
+ *
+ * @param identifier the unique identifier for the commit, used to track
+ * the distributed operation
+ * @param destination the destination for atomic operations, typically a
+ * {@link Store} instance
+ * @param broker the lock broker to use for coordinating access to
+ * shared resources
+ * @return the allocated {@link TwoPhaseCommit} that will coordinate the
+ * distributed operation
+ */
+ public TwoPhaseCommit allocate(EnsembleInstanceIdentifier identifier,
+ AtomicSupport destination, LockBroker broker) {
+ return commits.computeIfAbsent(identifier,
+ $ -> new TwoPhaseCommit($, destination, broker));
+ }
+
+ /**
+ * Deallocate the {@link TwoPhaseCommit}. This method is called when the
+ * distributed operation is complete and the resources used by the
+ * commit
+ * can be released.
+ *
+ * @param destination the destination that owns the commit
+ * @param identifier the identifier of the commit to deallocate
+ * @return {@code true} if the commit was deallocated successfully
+ */
+ public boolean deallocate(AtomicSupport destination,
+ EnsembleInstanceIdentifier identifier) {
+ return commits.remove(identifier) != null;
+ }
+
+ /**
+ * Return the allocated {@link TwoPhaseCommit} with {@code identifier}
+ * that is an extension of {@code destination}. This method is used to
+ * retrieve the commit instance for a specific distributed operation.
+ *
+ * @param destination the destination that owns the commit
+ * @param identifier the identifier of the commit to retrieve
+ * @return the {@link TwoPhaseCommit} or {@code null} if it does not
+ * exist
+ */
+ @Nullable
+ public TwoPhaseCommit get(AtomicSupport destination,
+ EnsembleInstanceIdentifier identifier) {
+ return commits.get(identifier);
+ }
+
+ /**
+ * Intercept method calls on the {@link TwoPhaseCommit} for the given
+ * destination and identifier. This method is only called when the
+ * destination store is an {@link AtomicOperation}, as it is needed to
+ * coordinate both the distributed protocol and the local atomic
+ * operation using the same lifecycle methods.
+ *
+ * @param destination the destination that owns the commit
+ * @param identifier the identifier of the commit to intercept
+ * @return {@code true} if the interception was successful
+ */
+ public boolean intercept(AtomicSupport destination,
+ EnsembleInstanceIdentifier identifier) {
+ TwoPhaseCommit commit = get(destination, identifier);
+ if(commit != null && commit.isLocalAtomicOperation()) {
+ return allocator().commits.replace(identifier, commit,
+ commit.intercept());
+ }
+ return false;
+ }
+ }
+
+ /**
+ * A {@link TwoPhaseCommit} that intercepts method calls to coordinate
+ * distributed atomic operations. This class is only used when the
+ * destination store is an {@link AtomicOperation}, as it is needed to
+ * coordinate both the distributed protocol and the local atomic operation
+ * using the same lifecycle methods.
+ *
+ * The interceptor works by wrapping the original {@link TwoPhaseCommit} and
+ * intercepting method calls to ensure proper coordination of both the
+ * distributed protocol and the local atomic operation. It maintains a
+ * reference to the local {@link AtomicOperation} being coordinated and
+ * ensures that the coordinator's lifecycle control methods are properly
+ * routed to both the distributed coordination and the local atomic
+ * operation.
+ *
+ *
+ * @author Jeff Nelson
+ */
+ private static class TwoPhaseCommitInterceptor extends TwoPhaseCommit {
+
+ /**
+ * The local {@link AtomicOperation} being coordinated. This is the
+ * original operation that is being distributed across multiple nodes
+ * and
+ * must maintain atomicity both within the local node and across the
+ * distributed system.
+ */
+ final AtomicOperation local;
+
+ /**
+ * Construct a new instance.
+ *
+ * @param commit the commit to intercept, which contains the original
+ * atomic operation being coordinated
+ */
+ TwoPhaseCommitInterceptor(TwoPhaseCommit commit) {
+ super(commit.identifier, (AtomicOperation) commit.durable,
+ commit.broker);
+ this.local = (AtomicOperation) commit.durable;;
+ }
+
+ @Override
+ public void finish() {
+ if(status() == Status.FINALIZING) {
+ local.complete(version);
+ Logger.debug(
+ "A distributed atomic operation has been applied locally");
+ }
+ super.finish();
+ }
+
+ /**
+ * Abort the local atomic operation. This method is called when the
+ * distributed operation needs to be cancelled, typically due to a
+ * failure in one of the participating nodes. It ensures that both the
+ * distributed protocol and the local atomic operation are properly
+ * aborted.
+ */
+ @SuppressWarnings("unused") // called via Reflection
+ void $abort() {
+ local.abort();
+ }
+
+ /**
+ * Commit the local atomic operation. This method is called during the
+ * first phase of the two-phase commit protocol to prepare the local
+ * operation for commitment. It ensures that both the distributed
+ * protocol and the local atomic operation are properly prepared.
+ *
+ * @param version the version to commit at, which ensures consistency
+ * across all participating nodes
+ * @return {@code true} if the commit was successful
+ * @throws AtomicStateException if the commit cannot be performed due to
+ * an invalid state
+ */
+ @SuppressWarnings("unused") // called via Reflection
+ boolean $commit(long version) throws AtomicStateException {
+ if(local.status() == Status.FINALIZING) {
+ // In cases where an Ensemble Tandem includes multiple Cohorts
+ // with the same leader node, the #2pc will be committed more
+ // than once.
+ return true;
+ }
+ else {
+ return Reflection.call(local, "prepare");
+ }
+ }
+ }
+}
diff --git a/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/db/Database.java b/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/db/Database.java
index 83370f0ea..2a0a6f077 100644
--- a/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/db/Database.java
+++ b/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/db/Database.java
@@ -660,7 +660,7 @@ public Map> explore(String key, Aliases aliases) {
Map> map = index.findAndGet(aliases.operator(),
Ks);
return Transformers.transformTreeMapSet(map, Identifier::longValue,
- Value::getTObject, Long::compare);
+ Value::getTObject, Comparators.LONG_COMPARATOR);
}
@Override
@@ -673,7 +673,7 @@ public Map> explore(String key, Aliases aliases,
Map> map = index.findAndGet(timestamp,
aliases.operator(), Ks);
return Transformers.transformTreeMapSet(map, Identifier::longValue,
- Value::getTObject, Long::compare);
+ Value::getTObject, Comparators.LONG_COMPARATOR);
}
@Override
diff --git a/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/db/compaction/Compactor.java b/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/db/compaction/Compactor.java
index 270d46037..a84ff6652 100644
--- a/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/db/compaction/Compactor.java
+++ b/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/db/compaction/Compactor.java
@@ -24,7 +24,7 @@
import com.cinchapi.concourse.server.storage.db.SegmentStorageSystem;
import com.cinchapi.concourse.server.storage.db.kernel.Segment;
-import com.cinchapi.concourse.time.Time;
+import com.cinchapi.concourse.util.Identifiers;
import com.cinchapi.concourse.util.Logger;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
@@ -190,7 +190,7 @@ protected final int getShiftIndex() {
*/
@VisibleForTesting
protected final void runShift(int index, int count) {
- String id = Long.toString(Time.now());
+ String id = Long.toString(Identifiers.next());
List segments = storage.segments();
int limit = segments.size();
if(segments.get(limit - 1).isMutable()) {
diff --git a/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/temp/Write.java b/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/temp/Write.java
index 66f17d42e..9f32cc452 100644
--- a/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/temp/Write.java
+++ b/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/temp/Write.java
@@ -21,6 +21,7 @@
import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;
+import com.cinchapi.common.base.Array;
import com.cinchapi.common.io.ByteBuffers;
import com.cinchapi.concourse.server.io.ByteSink;
import com.cinchapi.concourse.server.io.Byteable;
@@ -33,6 +34,7 @@
import com.cinchapi.concourse.server.storage.Versioned;
import com.cinchapi.concourse.server.storage.cache.ByteableFunnel;
import com.cinchapi.concourse.thrift.TObject;
+import com.cinchapi.ensemble.CompositeLocator;
import com.google.common.hash.HashCode;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
@@ -45,7 +47,7 @@
* @author Jeff Nelson
*/
@Immutable
-public final class Write implements Byteable, Versioned {
+public final class Write implements Byteable, CompositeLocator, Versioned {
/**
* Return a storable Write that represents a revision to ADD {@code key} as
@@ -361,6 +363,11 @@ public boolean isStorable() {
return version != NO_VERSION;
}
+ @Override
+ public Object[] locators() {
+ return Array.containing(key.toString(), record.longValue());
+ }
+
/**
* Return {@code true} if this Write and {@code other} have the same
* {@code type} and are equal.
diff --git a/concourse-server/src/main/java/com/cinchapi/concourse/time/TimeSource.java b/concourse-server/src/main/java/com/cinchapi/concourse/time/TimeSource.java
new file mode 100644
index 000000000..224bad149
--- /dev/null
+++ b/concourse-server/src/main/java/com/cinchapi/concourse/time/TimeSource.java
@@ -0,0 +1,241 @@
+/*
+ * Copyright (c) 2013-2025 Cinchapi Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cinchapi.concourse.time;
+
+import java.util.concurrent.TimeUnit;
+
+import com.cinchapi.common.base.AnyObjects;
+import com.cinchapi.ensemble.clock.Clock;
+
+/**
+ * A bridge between a {@link Clock}, which tells {@link Clock#time() time} in
+ * epoch milliseconds, and Concourse's {@link MicrosClock microsecond-precision
+ * time requirements}.
+ * The {@link TimeSource} abstraction allows for
+ *
+ * -
+ * correctly getting
+ * {@link #epochMicros() microsecond} timestamps based on the actual time of an
+ * underlying {@link Clock clock}, and
+ *
+ * -
+ * {@link #interpret(long) interpreting} timestamps received from the underlying
+ * {@link Clock clock} as a Concourse compatible microsecond timestamp
+ *
+ *
+ *
+ * For interchangeability, each {@link TimeSource} is an extension of a
+ * {@link Clock} and the {@link Clock#time()} method always returns a
+ * clock-native timestamp. Internal Concourse components should use the
+ * {@link #epochMicros()} method to get Concourse timestamps.
+ *
+ *
+ * In cases where a provided timestamp is known to have been generated using the
+ * {@link #time()} method (e.g., in the distributed system framework}, internal
+ * Concourse components should first {@link #interpret(long) convert} the
+ * timestamp to microseconds before using it.
+ *
+ *
+ * NOTE: Only one {@link TimeSource} can be active for a given
+ * application runtime. Use {@link TimeSource#get()} to obtain the active
+ * {@link TimeSource}. If none has been explicitly activated, {@link #get()}
+ * will default to the {@link #local() local} {@link TimeSource}.
+ *
+ *
+ * @author Jeff Nelson
+ */
+public abstract class TimeSource implements Clock, MicrosClock {
+
+ /**
+ * Return the canonical {@link TimeSource} for distributed environments.
+ *
+ * In a distributed system, clock drift can cause timestamps to diverge
+ * across nodes, leading to inconsistencies where time may appear to move
+ * backward when events from different nodes are consolidated. To mitigate
+ * this issue, a distributed {@link TimeSource} ensures that time progresses
+ * monotonically, preventing any node from observing non-sequential
+ * timestamps.
+ *
+ *
+ * @return the distributed {@link TimeSource} instance
+ * @throws UnsupportedOperationException if a different time source is
+ * already in use
+ */
+ public static TimeSource distributed() {
+ if(LOCAL != null) {
+ throw new UnsupportedOperationException(
+ "A different time source has already been created");
+ }
+ if(DISTRIBUTED == null) {
+ DISTRIBUTED = new DistributedTimeSource();
+ }
+ return DISTRIBUTED;
+ }
+
+ /**
+ * Return the singleton instance of the {@link TimeSource}.
+ *
+ * @return the {@link TimeSource} instance
+ */
+ public static TimeSource get() {
+ TimeSource instance = AnyObjects.firstThat(s -> s != null, LOCAL,
+ DISTRIBUTED);
+ if(instance == null) {
+ LOCAL = new LocalTimeSource();
+ instance = LOCAL;
+ }
+ return instance;
+ }
+
+ /**
+ * Return the canonical {@link TimeSource} that uses the local system clock.
+ *
+ * @return the local {@link TimeSource} instance
+ * @throws UnsupportedOperationException if a different time source is
+ * already in use
+ */
+ public static TimeSource local() {
+ if(DISTRIBUTED != null) {
+ throw new UnsupportedOperationException(
+ "A different time source has already been created");
+ }
+ if(LOCAL == null) {
+ LOCAL = new LocalTimeSource();
+ }
+ return LOCAL;
+ }
+
+ /**
+ * The singleton instance of local {@link TimeSource}.
+ */
+ private static TimeSource LOCAL = null;;
+
+ /**
+ * The singleton instance of distributed {@link TimeSource}.
+ */
+ private static TimeSource DISTRIBUTED = null;
+
+ /**
+ * Construct a new instance.
+ */
+ private TimeSource() {/* no-init */}
+
+ @Override
+ public long epochMicros() {
+ return interpret(time());
+ }
+
+ /**
+ * Interpret a raw timestamp value and convert it to microseconds. Different
+ * implementations may apply different conversion logic based on the origin
+ * of the timestamp.
+ *
+ * @param time the raw timestamp to interpret
+ * @return the interpreted time in microseconds
+ */
+ public abstract long interpret(long time);
+
+ /**
+ * A {@link TimeSource} for distributed environments that uses a
+ * {@link Clock#hybrid(Clock) hybrid logical clock} based on the
+ * {@link Clock#ntp() Network Time Protocol}.
+ *
+ * @author Jeff Nelson
+ */
+ static class DistributedTimeSource extends ForwardingTimeSource {
+
+ /**
+ * Construct a new instance.
+ */
+ DistributedTimeSource() {
+ super(Clock.hybrid(Clock.ntp()));
+
+ // Configure Time utilities to use this time source for timestamp
+ // generation without explicitly calling this TimeSource directly.
+ Time.setClock(this);
+ }
+
+ @Override
+ public long interpret(long time) {
+ // The high order 48-bits of a hybrid timestamp contain the clock
+ // timestamp.
+ long millis = (time >>> 16) & 0xFFFFFFFFFFFFL;
+ return TimeUnit.MICROSECONDS.convert(millis, TimeUnit.MILLISECONDS);
+ }
+
+ }
+
+ /**
+ * A {@link TimeSource} implementation that uses the local system time.
+ *
+ *
+ * This {@link TimeSource} relies on the default
+ * {@link com.cinchapi.concourse.time.Time}
+ * utilities and assumes that any timestamps it {@link #interpret(long)
+ * interprets} were directly
+ * or indirectly generated by those utilities.
+ *
+ * @author Jeff Nelson
+ */
+ static class LocalTimeSource extends TimeSource {
+
+ @Override
+ public long interpret(long time) {
+ return time;
+ }
+
+ @Override
+ public long time() {
+ return Time.now();
+ }
+
+ }
+
+ /**
+ * A {@link TimeSource} that forwards time requests to another {@link Clock}
+ * implementation, but may {@link #interpret(long) interpret} timestamps
+ * generated by that clock differently for internal compatibility.
+ *
+ * @author Jeff Nelson
+ */
+ private static abstract class ForwardingTimeSource extends TimeSource {
+
+ /**
+ * The underlying {@link Clock} that provides the time.
+ */
+ protected final Clock clock;
+
+ /**
+ * Construct a new instance.
+ *
+ * @param clock
+ */
+ ForwardingTimeSource(Clock clock) {
+ this.clock = clock;
+ }
+
+ @Override
+ public final void sync(long time) {
+ clock.sync(time);
+ }
+
+ @Override
+ public final long time() {
+ return clock.time();
+ }
+ }
+
+}
diff --git a/concourse-server/src/main/java/com/cinchapi/concourse/util/Comparators.java b/concourse-server/src/main/java/com/cinchapi/concourse/util/Comparators.java
index 337d5fd92..d1856ed15 100644
--- a/concourse-server/src/main/java/com/cinchapi/concourse/util/Comparators.java
+++ b/concourse-server/src/main/java/com/cinchapi/concourse/util/Comparators.java
@@ -15,6 +15,7 @@
*/
package com.cinchapi.concourse.util;
+import java.io.Serializable;
import java.util.Comparator;
import com.google.common.collect.Ordering;
@@ -36,8 +37,9 @@ public final class Comparators {
*
* @return the comparator
*/
+ @SuppressWarnings("unchecked")
public static Comparator equalOrArbitrary() {
- return (o1, o2) -> {
+ return (Comparator & Serializable) (o1, o2) -> {
if(o1 == o2 || o1.equals(o2)) {
return 0;
}
@@ -56,7 +58,7 @@ public static Comparator equalOrArbitrary() {
*/
@SuppressWarnings("unchecked")
public static Comparator naturalOrArbitrary() {
- return (o1, o2) -> {
+ return (Comparator & Serializable) (o1, o2) -> {
if(o1 instanceof Comparable) {
return ((Comparable) o1).compareTo(o2);
@@ -68,12 +70,41 @@ public static Comparator naturalOrArbitrary() {
};
}
+ /**
+ * Return a {@link Comparator} that sorts {@link Long} values.
+ *
+ * @return the {@link Comparator}
+ */
+ @SuppressWarnings("unchecked")
+ private static final Comparator longComparator() {
+ return (Comparator & Serializable) (o1, o2) -> o1.compareTo(o2);
+ }
+
+ /**
+ * Return a {@link Comparator} that sorts {@link String Strings} in a case
+ * insensitive manner.
+ *
+ * @return the comparator
+ */
+ @SuppressWarnings("unchecked")
+ private static final Comparator stringIgnoreCase() {
+ return (Comparator & Serializable) (s1, s2) -> s1
+ .compareToIgnoreCase(s2);
+ }
+
/**
* A comparator that sorts strings lexicographically without regards to
* case.
*/
- public final static Comparator CASE_INSENSITIVE_STRING_COMPARATOR = (
- s1, s2) -> s1.compareToIgnoreCase(s2);
+ // NOTE: In order for this to be serializable, it must be set to the value
+ // returned from #caseInsensitiveStringComparator(), which casts the lambda
+ // as Serializable
+ public final static Comparator CASE_INSENSITIVE_STRING_COMPARATOR = stringIgnoreCase();
+
+ /**
+ * A {@link Comparator} that sorts {@link Long} values.
+ */
+ public final static Comparator LONG_COMPARATOR = longComparator();
private Comparators() {/* noop */}
diff --git a/concourse-server/src/main/java/com/cinchapi/concourse/util/TestData.java b/concourse-server/src/main/java/com/cinchapi/concourse/util/TestData.java
index 055be5de3..4a2d41717 100644
--- a/concourse-server/src/main/java/com/cinchapi/concourse/util/TestData.java
+++ b/concourse-server/src/main/java/com/cinchapi/concourse/util/TestData.java
@@ -35,7 +35,6 @@
import com.cinchapi.concourse.server.storage.db.TableRevision;
import com.cinchapi.concourse.server.storage.temp.Write;
import com.cinchapi.concourse.thrift.TObject;
-import com.cinchapi.concourse.time.Time;
/**
* A utility class for getting test data.
@@ -52,7 +51,7 @@ public final class TestData extends Random {
* @return the file path
*/
public static String getTemporaryTestFile() {
- return DATA_DIR + File.separator + Time.now() + ".tmp";
+ return DATA_DIR + File.separator + Identifiers.next() + ".tmp";
}
/**
@@ -61,7 +60,7 @@ public static String getTemporaryTestFile() {
* @return the directory path
*/
public static String getTemporaryTestDir() {
- return DATA_DIR + File.separator + Time.now();
+ return DATA_DIR + File.separator + Identifiers.next();
}
public static TableRevision getPrimaryRevision() {
diff --git a/concourse-server/src/test/java/com/cinchapi/concourse/server/storage/AtomicOperationTest.java b/concourse-server/src/test/java/com/cinchapi/concourse/server/storage/AtomicOperationTest.java
index 8507f97ac..b5607186a 100644
--- a/concourse-server/src/test/java/com/cinchapi/concourse/server/storage/AtomicOperationTest.java
+++ b/concourse-server/src/test/java/com/cinchapi/concourse/server/storage/AtomicOperationTest.java
@@ -31,6 +31,7 @@
import com.cinchapi.concourse.thrift.TObject;
import com.cinchapi.concourse.time.Time;
import com.cinchapi.concourse.util.Convert;
+import com.cinchapi.concourse.util.Identifiers;
import com.cinchapi.concourse.util.TestData;
import com.google.common.base.MoreObjects;
import com.google.common.collect.Sets;
@@ -310,7 +311,7 @@ public void run() {
public void testNoDeadLockIfFindEqOnKeyBeforeAddingToKey() {
String key = "ipeds_id";
TObject value = Convert.javaToThrift(1);
- long record = Time.now();
+ long record = Identifiers.next();
AtomicOperation operation = (AtomicOperation) store;
operation.find(key, Operator.EQUALS, value);
operation.add(key, value, record);
@@ -321,7 +322,7 @@ long record = Time.now();
public void testNoDeadLockIfFindGtOnKeyBeforeAddingToKey() {
String key = "ipeds_id";
TObject value = Convert.javaToThrift(1);
- long record = Time.now();
+ long record = Identifiers.next();
AtomicOperation operation = (AtomicOperation) store;
operation.find(key, Operator.GREATER_THAN, value);
operation.add(key, value, record);
@@ -332,7 +333,7 @@ long record = Time.now();
public void testNoDeadLockIfFindGteOnKeyBeforeAddingToKey() {
String key = "ipeds_id";
TObject value = Convert.javaToThrift(1);
- long record = Time.now();
+ long record = Identifiers.next();
AtomicOperation operation = (AtomicOperation) store;
operation.find(key, Operator.GREATER_THAN_OR_EQUALS, value);
operation.add(key, value, record);
@@ -343,7 +344,7 @@ long record = Time.now();
public void testNoDeadLockIfFindLteOnKeyBeforeAddingToKey() {
String key = "ipeds_id";
TObject value = Convert.javaToThrift(1);
- long record = Time.now();
+ long record = Identifiers.next();
AtomicOperation operation = (AtomicOperation) store;
operation.find(key, Operator.LESS_THAN_OR_EQUALS, value);
operation.add(key, value, record);
@@ -354,7 +355,7 @@ long record = Time.now();
public void testNoDeadLockIfFindLtOnKeyBeforeAddingToKey() {
String key = "ipeds_id";
TObject value = Convert.javaToThrift(1);
- long record = Time.now();
+ long record = Identifiers.next();
AtomicOperation operation = (AtomicOperation) store;
operation.find(key, Operator.LESS_THAN, value);
operation.add(key, value, record);
@@ -365,7 +366,7 @@ long record = Time.now();
public void testNoDeadLockIfFindBwOnKeyBeforeAddingToKey() {
String key = "ipeds_id";
TObject value = Convert.javaToThrift(1);
- long record = Time.now();
+ long record = Identifiers.next();
AtomicOperation operation = (AtomicOperation) store;
operation.find(key, Operator.BETWEEN, value, Convert.javaToThrift(3));
operation.add(key, value, record);
@@ -376,7 +377,7 @@ long record = Time.now();
public void testNoDeadLockIfFindRegexOnKeyBeforeAddingToKey() {
String key = "ipeds_id";
TObject value = Convert.javaToThrift(1);
- long record = Time.now();
+ long record = Identifiers.next();
AtomicOperation operation = (AtomicOperation) store;
operation.find(key, Operator.REGEX, value);
operation.add(key, value, record);
@@ -387,7 +388,7 @@ long record = Time.now();
public void testNoDeadLockIfFindNotRegexOnKeyBeforeAddingToKey() {
String key = "ipeds_id";
TObject value = Convert.javaToThrift(1);
- long record = Time.now();
+ long record = Identifiers.next();
AtomicOperation operation = (AtomicOperation) store;
operation.find(key, Operator.NOT_REGEX, value);
operation.add(key, value, record);
diff --git a/concourse-server/src/test/java/com/cinchapi/concourse/server/storage/DistributedStoreTest.java b/concourse-server/src/test/java/com/cinchapi/concourse/server/storage/DistributedStoreTest.java
new file mode 100644
index 000000000..67199ecce
--- /dev/null
+++ b/concourse-server/src/test/java/com/cinchapi/concourse/server/storage/DistributedStoreTest.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright (c) 2013-2025 Cinchapi Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cinchapi.concourse.server.storage;
+
+/**
+ *
+ *
+ * @author jeff
+ */
+public class DistributedStoreTest {
+
+}
diff --git a/concourse-server/src/test/java/com/cinchapi/concourse/server/storage/EngineAtomicOperationTest.java b/concourse-server/src/test/java/com/cinchapi/concourse/server/storage/EngineAtomicOperationTest.java
index f61797484..24668f44e 100644
--- a/concourse-server/src/test/java/com/cinchapi/concourse/server/storage/EngineAtomicOperationTest.java
+++ b/concourse-server/src/test/java/com/cinchapi/concourse/server/storage/EngineAtomicOperationTest.java
@@ -35,6 +35,7 @@
import com.cinchapi.concourse.thrift.TObject;
import com.cinchapi.concourse.time.Time;
import com.cinchapi.concourse.util.Convert;
+import com.cinchapi.concourse.util.Identifiers;
import com.cinchapi.concourse.util.TestData;
import com.google.common.collect.Lists;
@@ -62,7 +63,7 @@ protected void starting(Description desc) {
public void testNoDeadLockIfFindNotRegexOnKeyBeforeAddingToKey() {
String key = "ipeds_id";
TObject value = Convert.javaToThrift(1);
- long record = Time.now();
+ long record = Identifiers.next();
AtomicOperation operation = (AtomicOperation) store;
operation.find(key, Operator.NOT_REGEX, value);
operation.add(key, value, record);
diff --git a/concourse-server/src/test/java/com/cinchapi/concourse/server/storage/EngineTest.java b/concourse-server/src/test/java/com/cinchapi/concourse/server/storage/EngineTest.java
index 57e73e339..e102177b9 100644
--- a/concourse-server/src/test/java/com/cinchapi/concourse/server/storage/EngineTest.java
+++ b/concourse-server/src/test/java/com/cinchapi/concourse/server/storage/EngineTest.java
@@ -42,6 +42,7 @@
import com.cinchapi.concourse.thrift.TObject;
import com.cinchapi.concourse.time.Time;
import com.cinchapi.concourse.util.Convert;
+import com.cinchapi.concourse.util.Identifiers;
import com.cinchapi.concourse.util.Random;
import com.cinchapi.concourse.util.TestData;
import com.google.common.collect.ImmutableSet;
@@ -154,7 +155,8 @@ public void testReproGH_441() throws Exception {
buffer.insert(Write.remove("name", Convert.javaToThrift("jeff"), 2));
while (!(boolean) method.invoke(buffer)) { // Fill the page so the
// buffer can transport
- engine.add("count", Convert.javaToThrift(Time.now()), Time.now());
+ engine.add("count", Convert.javaToThrift(Time.now()),
+ Identifiers.next());
}
for (int i = 0; i < 6; ++i) {
buffer.transport(db);
@@ -197,7 +199,8 @@ public void testReproGH_442() throws Exception {
buffer.insert(Write.add("name", Convert.javaToThrift("jeff"), 2));
while (!(boolean) method.invoke(buffer)) { // Fill the page so the
// buffer can transport
- engine.add("count", Convert.javaToThrift(Time.now()), Time.now());
+ engine.add("count", Convert.javaToThrift(Time.now()),
+ Identifiers.next());
}
for (int i = 0; i < 4; ++i) {
buffer.transport(db);
@@ -225,10 +228,10 @@ public void testBrowseKeyIsSortedBetweenDatabaseAndBuffer() {
"Yale University", "Harvard University");
for (String college : colleges) {
engine.durable.accept(Write.add("name",
- Convert.javaToThrift(college), Time.now()));
+ Convert.javaToThrift(college), Identifiers.next()));
}
- engine.limbo.insert(
- Write.add("name", Convert.javaToThrift("jeffery"), Time.now()));
+ engine.limbo.insert(Write.add("name", Convert.javaToThrift("jeffery"),
+ Identifiers.next()));
Set keys = engine.browse("name").keySet();
Assert.assertEquals(Convert.javaToThrift("Boston College"),
Iterables.get(keys, 0));
@@ -323,7 +326,7 @@ public void run() {
if(!done.get()) {
engine.add("foo",
Convert.javaToThrift(Long.toString(Time.now())),
- Time.now());
+ Identifiers.next());
}
}
}
@@ -359,11 +362,11 @@ public void reproCON_516() {
Buffer buffer = (Buffer) engine.limbo;
int count = 0;
while (!(boolean) Reflection.call(buffer, "canTransport")) {
- add("name", Convert.javaToThrift("Jeff"), Time.now());
+ add("name", Convert.javaToThrift("Jeff"), Identifiers.next());
count++;
}
buffer.transport(engine.durable);
- add("name", Convert.javaToThrift("Jeff"), Time.now());
+ add("name", Convert.javaToThrift("Jeff"), Identifiers.next());
count++;
Set matches = engine.find("name", Operator.EQUALS,
Convert.javaToThrift("jeff"));
@@ -427,7 +430,7 @@ public void testReproCON_668() throws Exception {
add("major", Convert.javaToThrift("business"), 2);
Engine engine = (Engine) store;
while (!Reflection. call(engine.limbo, "canTransport")) { // authorized
- add("foo", Convert.javaToThrift(Time.now()), Time.now());
+ add("foo", Convert.javaToThrift(Time.now()), Identifiers.next());
}
while (Reflection. call(engine.limbo, "canTransport")) { // authorized
engine.limbo.transport(engine.durable);
@@ -455,7 +458,7 @@ public void testReproCON_667() throws Exception {
add("major", Convert.javaToThrift("business"), 2);
Engine engine = (Engine) store;
while (!Reflection. call(engine.limbo, "canTransport")) { // authorized
- add("foo", Convert.javaToThrift(Time.now()), Time.now());
+ add("foo", Convert.javaToThrift(Time.now()), Identifiers.next());
}
while (Reflection. call(engine.limbo, "canTransport")) { // authorized
engine.limbo.transport(engine.durable);
@@ -498,7 +501,7 @@ public void testCommitVersionSplitBetweenBufferAndDatabase() {
atomic.commit();
long after = Time.now();
while (!Reflection. call(buffer, "canTransport")) {
- engine.add("foo", Convert.javaToThrift("bar"), Time.now());
+ engine.add("foo", Convert.javaToThrift("bar"), Identifiers.next());
}
// The buffer should transport at least 1 write, which would be the
@@ -558,7 +561,9 @@ public void testSameWriteVersionDatabaseIntersectionDetection()
int writes = TestData.getScaleCount();
for (int j = 0; j < writes; ++j) {
atomic.add("name", Convert.javaToThrift("jeff" + i),
- Math.abs(TestData.getInt()) % 2 == 0 ? Time.now() : j);
+ Math.abs(TestData.getInt()) % 2 == 0
+ ? Identifiers.next()
+ : j);
expected.incrementAndGet();
}
atomic.commit();
diff --git a/utils/configctl/configctl b/utils/configctl/configctl
new file mode 100755
index 000000000..751d191c7
Binary files /dev/null and b/utils/configctl/configctl differ
diff --git a/utils/start-cluster.sh b/utils/start-cluster.sh
new file mode 100755
index 000000000..ad5a03b10
--- /dev/null
+++ b/utils/start-cluster.sh
@@ -0,0 +1,280 @@
+#!/usr/bin/env bash
+
+# Copyright (c) 2013-2025 Cinchapi Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Normalize working directory, load shared functions, etc.
+. "$(dirname "$0")/.include"
+
+# Exit immediately if a command exits with a non-zero status,
+# and treat unset variables as an error.
+set -euo pipefail
+
+###############################################################################
+# Usage information
+###############################################################################
+usage() {
+ echo "Usage: $0 --nodes [--nodes ] [--rf ] [--clean]"
+ echo " --nodes, --node, -n Specify one or more ports on which the nodes will run."
+ echo " You can provide a comma-separated list or repeat the flag."
+ echo " --rf Optional: Set the replication factor. Default: (#nodes/2)+1"
+ echo " --clean Force generation of a new installer even if one exists."
+ echo " -h, --help Display this help and exit."
+ exit 1
+}
+
+###############################################################################
+# Parse command-line arguments
+###############################################################################
+nodes=()
+rf=""
+clean=""
+
+while [[ $# -gt 0 ]]; do
+ case "$1" in
+ --nodes|--node|-n)
+ shift
+ if [[ -z "${1:-}" ]]; then
+ echo "Error: Missing argument for --nodes"
+ usage
+ fi
+ IFS=',' read -ra PORTS <<< "$1"
+ for p in "${PORTS[@]}"; do
+ nodes+=("$p")
+ done
+ ;;
+ --rf)
+ shift
+ if [[ -z "${1:-}" ]]; then
+ echo "Error: Missing argument for --rf"
+ usage
+ fi
+ rf="$1"
+ ;;
+ --clean)
+ clean=true
+ ;;
+ -h|--help)
+ usage
+ ;;
+ *)
+ echo "Unknown option: $1"
+ usage
+ ;;
+ esac
+ shift
+done
+
+if [ "${#nodes[@]}" -eq 0 ]; then
+ echo "Error: You must supply at least one --nodes argument."
+ usage
+fi
+
+# Set default replication factor if not provided: (#nodes / 2) + 1
+if [ -z "$rf" ]; then
+ count=${#nodes[@]}
+ rf=$(( (count / 2) + 1 ))
+fi
+
+# Define color and formatting codes
+BOLD='\033[1m'
+GREEN='\033[32m'
+RESET='\033[0m'
+
+# Update echo statements to use color and bold
+echo -e "${BOLD}${GREEN}Launching cluster with nodes on ports: ${nodes[*]}${RESET}"
+echo -e "${BOLD}${GREEN}Replication Factor: $rf${RESET}"
+
+###############################################################################
+# Use absolute path for configctl command (assumed relative to this script)
+###############################################################################
+CONFIGCTL="configctl/configctl"
+
+###############################################################################
+# Determine installer status (using a .bin installer)
+###############################################################################
+installer_dir="../concourse-server/build/distributions"
+if [ -d "$installer_dir" ]; then
+ existing_installer=$(find "$installer_dir" -maxdepth 1 -type f -name "concourse-server*.bin" | head -n 1)
+else
+ existing_installer=""
+fi
+
+if [ -z "${clean:-}" ] && [ -n "$existing_installer" ]; then
+ installer_sh="$existing_installer"
+ echo "Using existing installer: $installer_sh"
+else
+ echo -e "${BOLD}${GREEN}Building installer...${RESET}"
+ pushd ".." > /dev/null
+ ./gradlew clean installer
+ popd > /dev/null
+
+ installer_sh=$(find "$installer_dir" -maxdepth 1 -type f -name "concourse-server*.bin" | head -n 1)
+ if [ -z "$installer_sh" ]; then
+ echo "Error: Installer not found in $installer_dir after building."
+ exit 1
+ fi
+ echo -e "${BOLD}${GREEN}Installer built: $installer_sh${RESET}"
+fi
+
+###############################################################################
+# Helper functions and variables (Bash 3.x compatible using indexed arrays)
+###############################################################################
+node_dirs=() # Array to hold temp directories per node index
+tail_pids=() # Array to hold tail process PIDs per node index
+
+# Function to find an available port (requires lsof and shuf)
+get_free_port() {
+ local port
+ while :; do
+ if command -v shuf >/dev/null 2>&1; then
+ port=$(shuf -i 2000-65000 -n 1)
+ else
+ port=$(jot -r 1 2000 65000)
+ fi
+ if ! lsof -i :"$port" &>/dev/null; then
+ echo "$port"
+ return
+ fi
+ done
+}
+
+# Build the full cluster nodes list (each as localhost:port)
+cluster_nodes=()
+for port in "${nodes[@]}"; do
+ cluster_nodes+=("localhost:$port")
+done
+
+###############################################################################
+# Cleanup function to stop nodes, kill log tails, and remove temporary directories
+###############################################################################
+cleanup() {
+ echo -e "\nStopping all nodes..."
+ for (( i=0; i<${#nodes[@]}; i++ )); do
+ node_port="${nodes[$i]}"
+ tmp_dir="${node_dirs[$i]}"
+ echo "Stopping node running on port $node_port..."
+ if [ -x "$tmp_dir/concourse-server/bin/concourse" ]; then
+ (cd "$tmp_dir/concourse-server/bin" && ./concourse stop)
+ fi
+ done
+
+ echo "Terminating log tail processes..."
+ for (( i=0; i<${#tail_pids[@]}; i++ )); do
+ kill "${tail_pids[$i]}" 2>/dev/null || true
+ done
+
+ echo "Cleaning up temporary directories..."
+ for (( i=0; i<${#node_dirs[@]}; i++ )); do
+ rm -rf "${node_dirs[$i]}"
+ done
+
+ exit 0
+}
+trap cleanup SIGINT
+
+###############################################################################
+# Loop to create, configure, and start each node
+###############################################################################
+# Add arrays to store debug ports and directories for the summary
+debug_ports=()
+jmx_ports=()
+
+for (( i=0; i<${#nodes[@]}; i++ )); do
+ node_port="${nodes[$i]}"
+ echo -e "${BOLD}${GREEN}Setting up node on port $node_port...${RESET}"
+ tmp_dir=$(mktemp -d "${TMPDIR:-/tmp}/concourse_node.XXXXXX")
+ node_dirs[$i]="$tmp_dir"
+ echo -e "${BOLD}${GREEN}Node directory: $tmp_dir${RESET}"
+
+ # Copy installer and run it to create the node installation (creates concourse-server/)
+ cp "$installer_sh" "$tmp_dir/"
+ (cd "$tmp_dir" && sh concourse-server*bin -- skip-integration)
+
+ # The configuration file is now under concourse-server/conf/concourse.yaml
+ config_file="$tmp_dir/concourse-server/conf/concourse.yaml"
+ if [ ! -f "$config_file" ]; then
+ echo "Error: Config file not found at $config_file"
+ exit 1
+ fi
+
+ # Set the cluster nodes list in the config (each node gets the complete list)
+ for (( j=0; j<${#cluster_nodes[@]}; j++ )); do
+ address="${cluster_nodes[$j]}"
+ "$CONFIGCTL" write -k "cluster.nodes.$j" -v "$address" -f "$config_file"
+ done
+
+ "$CONFIGCTL" write -k "cluster.replication_factor" -v "$rf" -f "$config_file"
+
+ # Select a free port for remote debugging and set it
+ debugger_port=$(get_free_port)
+ jmx_port=$(get_free_port)
+ "$CONFIGCTL" write -k "remote_debugger_port" -v "$debugger_port" -f "$config_file"
+ "$CONFIGCTL" write -k "jmx_port" -v "$jmx_port" -f "$config_file"
+ echo -e "${BOLD}${GREEN}Node on port $node_port will listen for remote debugging on port $debugger_port${RESET}"
+
+ "$CONFIGCTL" write -k "log_level" -v "DEBUG" -f "$config_file"
+ "$CONFIGCTL" write -k "client_port" -v "$node_port" -f "$config_file"
+
+ # Prepare full paths for buffer and DB directories (create if necessary)
+ mkdir -p "$tmp_dir/data/buffer" "$tmp_dir/data/db"
+ "$CONFIGCTL" write -k "buffer_directory" -v "$(realpath "$tmp_dir/data/buffer")" -f "$config_file"
+ "$CONFIGCTL" write -k "database_directory" -v "$(realpath "$tmp_dir/data/db")" -f "$config_file"
+
+ # Start the node (it runs as a daemon)
+ (cd "$tmp_dir/concourse-server/bin" && ./concourse start)
+
+ # Tail all log files in the directory
+ echo "Looking for log files in: $tmp_dir/concourse-server/log"
+ while IFS= read -r log_file; do
+ if [ -n "$log_file" ]; then
+ echo "Found log file: $log_file"
+ # Extract the log file name for the prefix
+ log_name=$(basename "$log_file")
+ tail -f "$log_file" | sed "s/^/[Node $node_port - $log_name] /" &
+ tail_pids+=($!)
+ fi
+ done < <(find "$tmp_dir/concourse-server/log" -type f -name "*.log")
+
+ if [ ${#tail_pids[@]} -eq 0 ]; then
+ echo "Warning: No log files found for node on port $node_port"
+ ls -la "$tmp_dir/concourse-server/log" || echo "Log directory does not exist or is not accessible"
+ fi
+
+ # Store debug and jmx ports for summary
+ debug_ports[$i]=$debugger_port
+ jmx_ports[$i]=$jmx_port
+done
+
+###############################################################################
+# Print summary table
+###############################################################################
+# Function to print the summary display
+print_summary() {
+ echo -e "\n${BOLD}${GREEN}Cluster Node Summary:${RESET}"
+ printf "%-15s %-15s %-15s %s\n" "PORT" "DEBUG PORT" "JMX PORT" "DIRECTORY"
+ printf "%-15s %-15s %-15s %s\n" "----" "----------" "--------" "---------"
+ for (( i=0; i<${#nodes[@]}; i++ )); do
+ printf "%-15s %-15s %-15s %s\n" "${nodes[$i]}" "${debug_ports[$i]}" "${jmx_ports[$i]}" "${node_dirs[$i]}"
+ done
+ echo -e "\nPress Ctrl+C to stop all nodes and clean up.\n"
+}
+
+# Print the initial summary
+print_summary
+
+# Wait indefinitely so that the trap can capture Ctrl+C
+while true; do sleep 1; done
+
+exit 0