diff --git a/CHANGES.txt b/CHANGES.txt index 538baacb3..fde69404d 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -4,6 +4,7 @@ 0.3.0 ----- + * Update OperationalJob class to support cluster-wide operations (CASSSIDECAR-376) * RangeManager should be singleton in CDCModule (CASSSIDECAR-411) * CDC: Add end-to-end CDC integration tests (CASSSIDECAR-308) * SchemaStorePublisherFactory should be Injectable in CachingSchemaStore (CASSSIDECAR-408) diff --git a/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/OperationalJobResponse.java b/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/OperationalJobResponse.java index 271f398a8..3de8cef0e 100644 --- a/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/OperationalJobResponse.java +++ b/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/OperationalJobResponse.java @@ -18,6 +18,7 @@ package org.apache.cassandra.sidecar.common.response; +import java.util.List; import java.util.UUID; import com.fasterxml.jackson.annotation.JsonCreator; @@ -37,17 +38,35 @@ public class OperationalJobResponse private final OperationalJobStatus status; private final String operation; private final String reason; + private final String startTime; + private final List nodesPending; + private final List nodesExecuting; + private final List nodesSucceeded; + private final List nodesFailed; + private final String lastUpdate; @JsonCreator public OperationalJobResponse(@JsonProperty("jobId") UUID jobId, @JsonProperty("jobStatus") OperationalJobStatus status, @JsonProperty("operation") String operation, - @JsonProperty("reason") String reason) + @JsonProperty("reason") String reason, + @JsonProperty("startTime") String startTime, + @JsonProperty("nodesPending") List nodesPending, + @JsonProperty("nodesExecuting") List nodesExecuting, + @JsonProperty("nodesSucceeded") List nodesSucceeded, + @JsonProperty("nodesFailed") List nodesFailed, + @JsonProperty("lastUpdate") String lastUpdate) { this.jobId = jobId; this.status = status; this.operation = operation; this.reason = reason; + this.startTime = startTime; + this.nodesPending = nodesPending; + this.nodesExecuting = nodesExecuting; + this.nodesSucceeded = nodesSucceeded; + this.nodesFailed = nodesFailed; + this.lastUpdate = lastUpdate; } /** @@ -86,4 +105,57 @@ public String reason() return reason; } + /** + * @return the time the job execution started + */ + @JsonProperty("startTime") + public String startTime() + { + return startTime; + } + + /** + * @return list of node IDs pending execution + */ + @JsonProperty("nodesPending") + public List nodesPending() + { + return nodesPending; + } + + /** + * @return list of node IDs currently executing + */ + @JsonProperty("nodesExecuting") + public List nodesExecuting() + { + return nodesExecuting; + } + + /** + * @return list of node IDs that have succeeded + */ + @JsonProperty("nodesSucceeded") + public List nodesSucceeded() + { + return nodesSucceeded; + } + + /** + * @return list of node IDs that have failed + */ + @JsonProperty("nodesFailed") + public List nodesFailed() + { + return nodesFailed; + } + + /** + * @return a human-readable status message + */ + @JsonProperty("lastUpdate") + public String lastUpdate() + { + return lastUpdate; + } } diff --git a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CassandraNodeOperationsIntegrationTest.java b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CassandraNodeOperationsIntegrationTest.java index 5566342d6..540e3ac8d 100644 --- a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CassandraNodeOperationsIntegrationTest.java +++ b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CassandraNodeOperationsIntegrationTest.java @@ -71,6 +71,8 @@ protected void beforeTestStart() @Test void testNodeDrainOperationSuccess() { + String expectedHostId = getRingEntryForNode("localhost").hostId(); + // Initiate drain operation HttpResponse drainResponse = getBlocking( trustedClient().put(serverWrapper.serverPort, "localhost", ApiEndpointsV1.NODE_DRAIN_ROUTE) @@ -102,7 +104,7 @@ void testNodeDrainOperationSuccess() // Validate the operational job status using the OperationalJobHandler String jobId = responseBody.getString("jobId"); - validateOperationalJobStatus(jobId, "drain", OperationalJobStatus.SUCCEEDED); + validateOperationalJobStatus(jobId, "drain", OperationalJobStatus.SUCCEEDED, expectedHostId); } @@ -114,7 +116,7 @@ void testNodeMoveOperationSuccess() String requestBody = "{\"newToken\":\"" + testToken + "\"}"; // Validate that the node owns a different token than testToken - String currentToken = getCurrentTokenForNode("localhost"); + String currentToken = getRingEntryForNode("localhost").token(); assertThat(currentToken).isNotEqualTo(testToken); // Initiate move operation @@ -151,10 +153,11 @@ void testNodeMoveOperationSuccess() // Validate the operational job status using the OperationalJobHandler String jobId = responseBody.getString("jobId"); - validateOperationalJobStatus(jobId, "move", OperationalJobStatus.SUCCEEDED); + String expectedHostId = getRingEntryForNode("localhost").hostId(); + validateOperationalJobStatus(jobId, "move", OperationalJobStatus.SUCCEEDED, expectedHostId); // Validate that the node actually owns the new token - currentToken = getCurrentTokenForNode("localhost"); + currentToken = getRingEntryForNode("localhost").token(); assertThat(currentToken).isEqualTo(testToken); } @@ -175,11 +178,11 @@ void testNodeMoveOperationSuccess() void testNodeMoveOperationFailure() { // Get a token already owned by a node - String testToken = getCurrentTokenForNode("localhost2"); + String testToken = getRingEntryForNode("localhost2").token(); String requestBody = "{\"newToken\":\"" + testToken + "\"}"; // Validate that the node owns a different token than testToken - String initialToken = getCurrentTokenForNode("localhost"); + String initialToken = getRingEntryForNode("localhost").token(); assertThat(initialToken).isNotEqualTo(testToken); // Initiate move operation @@ -216,36 +219,32 @@ void testNodeMoveOperationFailure() // Validate the operational job status using the OperationalJobHandler String jobId = responseBody.getString("jobId"); - validateOperationalJobStatus(jobId, "move", OperationalJobStatus.FAILED); + String expectedHostId = getRingEntryForNode("localhost").hostId(); + validateOperationalJobStatus(jobId, "move", OperationalJobStatus.FAILED, expectedHostId); // Validate that the node didn't move - String currentToken = getCurrentTokenForNode("localhost"); + String currentToken = getRingEntryForNode("localhost").token(); assertThat(currentToken).isEqualTo(initialToken); assertThat(currentToken).isNotEqualTo(testToken); } /** - * Gets the current token for the specified node by querying the ring endpoint. + * Gets the ring entry for the specified node by querying the ring endpoint. * - * @param node the node hostname to get the token for - * @return the token currently owned by the specified node + * @param node the node hostname to look up + * @return the {@link RingEntry} for the specified node */ - private String getCurrentTokenForNode(String node) + private RingEntry getRingEntryForNode(String node) { HttpResponse ringResponse = getBlocking( trustedClient().get(serverWrapper.serverPort, node, ApiEndpointsV1.RING_ROUTE) .send()); - assertThat(ringResponse.statusCode()).isEqualTo(OK.code()); - RingResponse ring = ringResponse.bodyAsJson(RingResponse.class); - assertThat(ring).isNotNull(); - - RingEntry ringEntry = ring.stream() - .filter(entry -> entry.fqdn().equals(node)) - .findFirst() - .orElseThrow(() -> new AssertionError("Node " + node + " not found in ring")); - return ringEntry.token(); + return ring.stream() + .filter(entry -> entry.fqdn().equals(node)) + .findFirst() + .orElseThrow(() -> new AssertionError("Node " + node + " not found in ring")); } /** @@ -254,8 +253,11 @@ private String getCurrentTokenForNode(String node) * * @param jobId the ID of the operational job to validate * @param expectedOperation the expected operation name (e.g., "move", "decommission", "drain") + * @param expectedEndStatus the expected final status of the job + * @param expectedHostId the expected Cassandra host ID in the node tracking lists */ - private void validateOperationalJobStatus(String jobId, String expectedOperation, OperationalJobStatus expectedEndStatus) + private void validateOperationalJobStatus(String jobId, String expectedOperation, + OperationalJobStatus expectedEndStatus, String expectedHostId) { String operationalJobRoute = ApiEndpointsV1.OPERATIONAL_JOB_ROUTE.replace(":operationId", jobId); @@ -270,9 +272,14 @@ private void validateOperationalJobStatus(String jobId, String expectedOperation assertThat(jobStatusBody.getString("jobId")).isEqualTo(jobId); assertThat(jobStatusBody.getString("operation")).isEqualTo(expectedOperation); - // If the job is still running, wait for it to complete or reach a final state + // If the job is still running, verify node tracking lists reflect the executing state if (OperationalJobStatus.RUNNING.name().equals(jobStatusBody.getString("jobStatus"))) { + assertThat(jobStatusBody.getJsonArray("nodesPending")).isEmpty(); + assertThat(jobStatusBody.getJsonArray("nodesExecuting")).containsExactly(expectedHostId); + assertThat(jobStatusBody.getJsonArray("nodesSucceeded")).isEmpty(); + assertThat(jobStatusBody.getJsonArray("nodesFailed")).isEmpty(); + loopAssert(30, 500, () -> { HttpResponse finalJobStatusResponse = getBlocking( trustedClient().get(serverWrapper.serverPort, "localhost", operationalJobRoute) @@ -300,6 +307,21 @@ private void validateOperationalJobStatus(String jobId, String expectedOperation assertThat(jobStatusBody.getString("jobId")).isEqualTo(jobId); assertThat(jobStatusBody.getString("operation")).isEqualTo(expectedOperation); assertThat(jobStatusBody.getString("jobStatus")).isEqualTo(expectedEndStatus.name()); + assertThat(jobStatusBody.getString("startTime")).isNotNull(); + assertThat(jobStatusBody.getJsonArray("nodesPending")).isEmpty(); + assertThat(jobStatusBody.getJsonArray("nodesExecuting")).isEmpty(); + if (expectedEndStatus == OperationalJobStatus.SUCCEEDED) + { + assertThat(jobStatusBody.getString("lastUpdate")).contains("completed"); + assertThat(jobStatusBody.getJsonArray("nodesSucceeded")).containsExactly(expectedHostId); + assertThat(jobStatusBody.getJsonArray("nodesFailed")).isEmpty(); + } + else if (expectedEndStatus == OperationalJobStatus.FAILED) + { + assertThat(jobStatusBody.getString("lastUpdate")).contains("failed"); + assertThat(jobStatusBody.getJsonArray("nodesSucceeded")).isEmpty(); + assertThat(jobStatusBody.getJsonArray("nodesFailed")).containsExactly(expectedHostId); + } } /** diff --git a/server/src/main/java/org/apache/cassandra/sidecar/cluster/CassandraAdapterDelegate.java b/server/src/main/java/org/apache/cassandra/sidecar/cluster/CassandraAdapterDelegate.java index 9ce60c426..738a9424b 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/cluster/CassandraAdapterDelegate.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/cluster/CassandraAdapterDelegate.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import javax.management.Notification; @@ -520,6 +521,25 @@ public boolean isJmxUp() return nodeSettingsFromJmx != null; } + /** + * @return the Cassandra host UUID fetched via JMX + * @throws CassandraUnavailableException when JMX is not available + */ + @NotNull + public UUID hostId() throws CassandraUnavailableException + { + try + { + LimitedStorageOperations storageOperations = + jmxClient.proxy(LimitedStorageOperations.class, STORAGE_SERVICE_OBJ_NAME); + return UUID.fromString(storageOperations.getLocalHostId()); + } + catch (RuntimeException e) + { + throw new CassandraUnavailableException(JMX, e); + } + } + public void close() { closed = true; @@ -689,6 +709,11 @@ public interface LimitedStorageOperations * @return a collection of tokens formatted as strings */ List getTokens(); + + /** + * @return the local host ID for this Cassandra node + */ + String getLocalHostId(); } /** diff --git a/server/src/main/java/org/apache/cassandra/sidecar/handlers/ListOperationalJobsHandler.java b/server/src/main/java/org/apache/cassandra/sidecar/handlers/ListOperationalJobsHandler.java index 484039e5c..1c80b0b39 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/handlers/ListOperationalJobsHandler.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/handlers/ListOperationalJobsHandler.java @@ -35,8 +35,6 @@ import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; import org.jetbrains.annotations.NotNull; -import static org.apache.cassandra.sidecar.common.data.OperationalJobStatus.RUNNING; - /** * Handler for retrieving the all the jobs running on the sidecar */ @@ -72,7 +70,11 @@ protected void handleInternal(RoutingContext context, HttpServerRequest httpRequ ListOperationalJobsResponse listResponse = new ListOperationalJobsResponse(); jobManager.allInflightJobs() .stream() - .map(job -> new OperationalJobResponse(job.jobId(), RUNNING, job.name(), null)) + .map(job -> new OperationalJobResponse(job.jobId(), job.status(), job.name(), null, + job.formattedStartTime(), + job.nodesPending(), job.nodesExecuting(), + job.nodesSucceeded(), job.nodesFailed(), + job.lastUpdate())) .forEach(listResponse::addJob); context.json(listResponse); } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeDecommissionHandler.java b/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeDecommissionHandler.java index d2c522eb9..56a31cd7c 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeDecommissionHandler.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeDecommissionHandler.java @@ -20,6 +20,7 @@ import java.util.Collections; import java.util.Set; +import java.util.UUID; import com.datastax.driver.core.utils.UUIDs; import com.google.inject.Inject; @@ -28,6 +29,7 @@ import io.vertx.ext.auth.authorization.Authorization; import io.vertx.ext.web.RoutingContext; import org.apache.cassandra.sidecar.acl.authorization.BasicPermissions; +import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate; import org.apache.cassandra.sidecar.common.server.StorageOperations; import org.apache.cassandra.sidecar.concurrent.ExecutorPools; import org.apache.cassandra.sidecar.config.ServiceConfiguration; @@ -83,8 +85,10 @@ public void handleInternal(RoutingContext context, SocketAddress remoteAddress, Boolean isForce) { - StorageOperations operations = metadataFetcher.delegate(host).storageOperations(); - NodeDecommissionJob job = new NodeDecommissionJob(UUIDs.timeBased(), operations, isForce); + CassandraAdapterDelegate delegate = metadataFetcher.delegate(host); + StorageOperations operations = delegate.storageOperations(); + UUID nodeId = delegate.hostId(); + NodeDecommissionJob job = new NodeDecommissionJob(UUIDs.timeBased(), nodeId, operations, isForce); this.jobManager.trySubmitJob(job, (completedJob, exception) -> OperationalJobUtils.sendStatusBasedResponse(context, completedJob, exception), diff --git a/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeDrainHandler.java b/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeDrainHandler.java index b0de67532..7f5016564 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeDrainHandler.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeDrainHandler.java @@ -20,6 +20,7 @@ import java.util.Collections; import java.util.Set; +import java.util.UUID; import com.datastax.driver.core.utils.UUIDs; import com.google.inject.Inject; @@ -28,6 +29,7 @@ import io.vertx.ext.auth.authorization.Authorization; import io.vertx.ext.web.RoutingContext; import org.apache.cassandra.sidecar.acl.authorization.BasicPermissions; +import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate; import org.apache.cassandra.sidecar.common.server.StorageOperations; import org.apache.cassandra.sidecar.concurrent.ExecutorPools; import org.apache.cassandra.sidecar.config.ServiceConfiguration; @@ -81,8 +83,10 @@ public void handleInternal(RoutingContext context, SocketAddress remoteAddress, Void unused) { - StorageOperations operations = metadataFetcher.delegate(host).storageOperations(); - NodeDrainJob job = new NodeDrainJob(UUIDs.timeBased(), operations); + CassandraAdapterDelegate delegate = metadataFetcher.delegate(host); + StorageOperations operations = delegate.storageOperations(); + UUID nodeId = delegate.hostId(); + NodeDrainJob job = new NodeDrainJob(UUIDs.timeBased(), nodeId, operations); this.jobManager.trySubmitJob(job, (completedJob, exception) -> OperationalJobUtils.sendStatusBasedResponse(context, completedJob, exception), diff --git a/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeMoveHandler.java b/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeMoveHandler.java index c76798695..e1e81faff 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeMoveHandler.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeMoveHandler.java @@ -20,6 +20,7 @@ import java.util.Collections; import java.util.Set; +import java.util.UUID; import com.datastax.driver.core.utils.UUIDs; import com.google.inject.Inject; @@ -32,6 +33,7 @@ import io.vertx.ext.auth.authorization.Authorization; import io.vertx.ext.web.RoutingContext; import org.apache.cassandra.sidecar.acl.authorization.BasicPermissions; +import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate; import org.apache.cassandra.sidecar.common.request.data.NodeMoveRequestPayload; import org.apache.cassandra.sidecar.common.server.StorageOperations; import org.apache.cassandra.sidecar.common.utils.StringUtils; @@ -99,8 +101,10 @@ public void handleInternal(RoutingContext context, SocketAddress remoteAddress, String newToken) { - StorageOperations operations = metadataFetcher.delegate(host).storageOperations(); - NodeMoveJob job = new NodeMoveJob(UUIDs.timeBased(), newToken, operations); + CassandraAdapterDelegate delegate = metadataFetcher.delegate(host); + StorageOperations operations = delegate.storageOperations(); + UUID nodeId = delegate.hostId(); + NodeMoveJob job = new NodeMoveJob(UUIDs.timeBased(), nodeId, newToken, operations); this.jobManager.trySubmitJob(job, (completedJob, exception) -> OperationalJobUtils.sendStatusBasedResponse(context, completedJob, exception), diff --git a/server/src/main/java/org/apache/cassandra/sidecar/job/NodeDecommissionJob.java b/server/src/main/java/org/apache/cassandra/sidecar/job/NodeDecommissionJob.java index 778ea450b..d4f3937f0 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/job/NodeDecommissionJob.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/job/NodeDecommissionJob.java @@ -39,9 +39,9 @@ public class NodeDecommissionJob extends OperationalJob private final boolean isForce; protected StorageOperations storageOperations; - public NodeDecommissionJob(UUID jobId, StorageOperations storageOps, boolean isForce) + public NodeDecommissionJob(UUID jobId, UUID nodeId, StorageOperations storageOps, boolean isForce) { - super(jobId); + super(jobId, nodeId); this.storageOperations = storageOps; this.isForce = isForce; } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/job/NodeDrainJob.java b/server/src/main/java/org/apache/cassandra/sidecar/job/NodeDrainJob.java index 16fe02e2b..159959c20 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/job/NodeDrainJob.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/job/NodeDrainJob.java @@ -77,9 +77,9 @@ public static NodeDrainStateEnum fromOperationMode(String operationMode) } } - public NodeDrainJob(UUID jobId, StorageOperations storageOps) + public NodeDrainJob(UUID jobId, UUID nodeId, StorageOperations storageOps) { - super(jobId); + super(jobId, nodeId); this.storageOperations = storageOps; } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/job/NodeMoveJob.java b/server/src/main/java/org/apache/cassandra/sidecar/job/NodeMoveJob.java index d0690095f..f4613231c 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/job/NodeMoveJob.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/job/NodeMoveJob.java @@ -41,9 +41,9 @@ public class NodeMoveJob extends OperationalJob private final String newToken; protected StorageOperations storageOperations; - public NodeMoveJob(UUID jobId, String newToken, StorageOperations storageOps) + public NodeMoveJob(UUID jobId, UUID nodeId, String newToken, StorageOperations storageOps) { - super(jobId); + super(jobId, nodeId); this.newToken = newToken; this.storageOperations = storageOps; } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJob.java b/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJob.java index 660f7db0d..fa3b11449 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJob.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJob.java @@ -18,6 +18,8 @@ package org.apache.cassandra.sidecar.job; +import java.time.Instant; +import java.util.Collections; import java.util.List; import java.util.UUID; @@ -34,6 +36,7 @@ import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool; import org.apache.cassandra.sidecar.tasks.Task; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; /** * An abstract class representing operational jobs that run on Cassandra @@ -44,20 +47,59 @@ public abstract class OperationalJob implements Task // use v1 time-based uuid private final UUID jobId; + // The Cassandra host UUID for single-node jobs; null for cluster-wide jobs that manage node lists themselves + @Nullable + private final UUID nodeId; private final Promise executionPromise; private volatile boolean isExecuting = false; + private volatile Long startTime; + private volatile String lastUpdate; + + // Node tracking fields for job progress + private volatile List nodesPending; + private volatile List nodesExecuting; + private volatile List nodesSucceeded; + private volatile List nodesFailed; /** - * Constructs a job with a unique UUID, in Pending state + * Constructs a job with a unique UUID, in Pending state. + * Node tracking lists are initialized to empty; cluster-wide job subclasses manage them via protected setters. * * @param jobId UUID representing the Job to be created */ protected OperationalJob(UUID jobId) + { + this(jobId, null); + } + + /** + * Constructs a job with a unique UUID and an associated Cassandra node. + * When {@code nodeId} is provided, node tracking lists are automatically initialized + * and managed throughout the job lifecycle in {@link #execute(Promise)}. + * When {@code nodeId} is {@code null}, lists are initialized to empty and can be + * managed by cluster-wide job subclasses via protected setters. + * + * @param jobId UUID representing the Job to be created + * @param nodeId the Cassandra host UUID for this single-node job, or {@code null} for cluster-wide jobs + */ + protected OperationalJob(UUID jobId, @Nullable UUID nodeId) { Preconditions.checkArgument(jobId.version() == 1, "OperationalJob accepts only time-based UUID"); this.jobId = jobId; + this.nodeId = nodeId; this.executionPromise = Promise.promise(); + if (nodeId != null) + { + this.nodesPending = Collections.singletonList(nodeId); + } + else + { + this.nodesPending = Collections.emptyList(); + } + this.nodesExecuting = Collections.emptyList(); + this.nodesSucceeded = Collections.emptyList(); + this.nodesFailed = Collections.emptyList(); } public UUID jobId() @@ -65,6 +107,15 @@ public UUID jobId() return jobId; } + /** + * @return the Cassandra host UUID for single-node jobs, or {@code null} for cluster-wide jobs + */ + @Nullable + public UUID nodeId() + { + return nodeId; + } + @Override public final Void result() { @@ -80,6 +131,94 @@ public long creationTime() return UUIDs.unixTimestamp(jobId); } + /** + * @return unix timestamp in milliseconds of when the job execution started, or {@code null} if not yet started + */ + @Nullable + public Long startTime() + { + return startTime; + } + + /** + * @return ISO-8601 formatted start time string, or {@code null} if not yet started + */ + @Nullable + public String formattedStartTime() + { + return startTime != null ? Instant.ofEpochMilli(startTime).toString() : null; + } + + /** + * @return list of node IDs pending execution + */ + @NotNull + public List nodesPending() + { + return nodesPending; + } + + protected void nodesPending(List nodesPending) + { + this.nodesPending = nodesPending; + } + + /** + * @return list of node IDs currently executing + */ + @NotNull + public List nodesExecuting() + { + return nodesExecuting; + } + + protected void nodesExecuting(List nodesExecuting) + { + this.nodesExecuting = nodesExecuting; + } + + /** + * @return list of node IDs that have succeeded + */ + @NotNull + public List nodesSucceeded() + { + return nodesSucceeded; + } + + protected void nodesSucceeded(List nodesSucceeded) + { + this.nodesSucceeded = nodesSucceeded; + } + + /** + * @return list of node IDs that have failed + */ + @NotNull + public List nodesFailed() + { + return nodesFailed; + } + + protected void nodesFailed(List nodesFailed) + { + this.nodesFailed = nodesFailed; + } + + /** + * @return a human-readable status message, or {@code null} if not set + */ + @Nullable + public String lastUpdate() + { + return lastUpdate; + } + + protected void lastUpdate(String lastUpdate) + { + this.lastUpdate = lastUpdate; + } + /** * @return whether the operational job is executing or not. */ @@ -203,6 +342,13 @@ public Future asyncResult(TaskExecutorPool executorPool, DurationSpec wait public void execute(Promise promise) { isExecuting = true; + startTime = System.currentTimeMillis(); + lastUpdate = String.format("Started %s %s", name(), jobId); + if (nodeId != null) + { + nodesPending = Collections.emptyList(); + nodesExecuting = Collections.singletonList(nodeId); + } LOGGER.info("Executing job. jobId={}", jobId); promise.future().onComplete(executionPromise); try @@ -211,11 +357,23 @@ public void execute(Promise promise) internalFuture.onComplete(ar -> { if (ar.succeeded()) { + if (nodeId != null) + { + nodesExecuting = Collections.emptyList(); + nodesSucceeded = Collections.singletonList(nodeId); + } + lastUpdate = String.format("%s %s completed", name(), jobId); promise.tryComplete(); LOGGER.info("Complete job execution. jobId={} status={}", jobId, status()); } else { + if (nodeId != null) + { + nodesExecuting = Collections.emptyList(); + nodesFailed = Collections.singletonList(nodeId); + } + lastUpdate = String.format("%s %s failed with %s", name(), jobId, ar.cause().getMessage()); promise.tryFail(ar.cause()); LOGGER.error("Job execution failed. jobId={} reason={}", jobId, ar.cause().getMessage()); } @@ -223,7 +381,13 @@ public void execute(Promise promise) } catch (Throwable e) { + if (nodeId != null) + { + nodesExecuting = Collections.emptyList(); + nodesFailed = Collections.singletonList(nodeId); + } OperationalJobException oje = OperationalJobException.wraps(e); + lastUpdate = String.format("%s %s failed with %s", name(), jobId, oje.getMessage()); LOGGER.error("Job execution failed. jobId={} reason={}", jobId, oje.getMessage(), oje); promise.tryFail(oje); } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/utils/OperationalJobUtils.java b/server/src/main/java/org/apache/cassandra/sidecar/utils/OperationalJobUtils.java index 0edd5fd1b..50cafb1c2 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/utils/OperationalJobUtils.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/utils/OperationalJobUtils.java @@ -54,7 +54,11 @@ public static void sendStatusBasedResponse(RoutingContext context, OperationalJo String reason = exception.getMessage(); LOGGER.error("Conflicting job encountered. reason={}", reason); context.response().setStatusCode(HttpResponseStatus.CONFLICT.code()); - context.json(new OperationalJobResponse(job.jobId(), OperationalJobStatus.FAILED, job.name(), reason)); + context.json(new OperationalJobResponse(job.jobId(), OperationalJobStatus.FAILED, job.name(), reason, + job.formattedStartTime(), + job.nodesPending(), job.nodesExecuting(), + job.nodesSucceeded(), job.nodesFailed(), + job.lastUpdate())); return; } @@ -74,6 +78,10 @@ public static void sendStatusBasedResponse(RoutingContext context, OperationalJo { reason = job.asyncResult().cause().getMessage(); } - context.json(new OperationalJobResponse(job.jobId(), status, job.name(), reason)); + context.json(new OperationalJobResponse(job.jobId(), status, job.name(), reason, + job.formattedStartTime(), + job.nodesPending(), job.nodesExecuting(), + job.nodesSucceeded(), job.nodesFailed(), + job.lastUpdate())); } } diff --git a/server/src/test/integration/org/apache/cassandra/sidecar/routes/NodeDecommissionIntegrationTest.java b/server/src/test/integration/org/apache/cassandra/sidecar/routes/NodeDecommissionIntegrationTest.java index 005e46d0c..7a4a6e3d1 100644 --- a/server/src/test/integration/org/apache/cassandra/sidecar/routes/NodeDecommissionIntegrationTest.java +++ b/server/src/test/integration/org/apache/cassandra/sidecar/routes/NodeDecommissionIntegrationTest.java @@ -42,7 +42,7 @@ import static org.apache.cassandra.sidecar.common.data.OperationalJobStatus.RUNNING; import static org.apache.cassandra.sidecar.common.data.OperationalJobStatus.SUCCEEDED; import static org.apache.cassandra.testing.utils.AssertionUtils.loopAssert; -import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.assertj.core.api.Assertions.assertThat; /** * Test the node decommission endpoint with cassandra container. @@ -54,16 +54,18 @@ public class NodeDecommissionIntegrationTest extends IntegrationTestBase void decommissionNodeDefault(VertxTestContext context) { final AtomicReference jobId = new AtomicReference<>(); + final AtomicReference expectedNodeId = new AtomicReference<>(); String testRoute = "/api/v1/cassandra/operations/decommission?force=true"; testWithClient(client -> client.put(server.actualPort(), "127.0.0.1", testRoute) .send(context.succeeding(response -> { OperationalJobResponse decommissionResponse = response.bodyAsJson(OperationalJobResponse.class); assertThat(decommissionResponse.status()).isEqualTo(RUNNING); jobId.set(String.valueOf(decommissionResponse.jobId())); + expectedNodeId.set(decommissionResponse.nodesExecuting().get(0)); }))); Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS); assertThat(jobId.get()).isNotNull(); - pollStatusForState(jobId.get(), SUCCEEDED, null); + pollStatusForState(jobId.get(), SUCCEEDED, null, expectedNodeId.get()); context.completeNow(); } @@ -84,7 +86,8 @@ void decommissionNodeWithFailure(VertxTestContext context) private void pollStatusForState(String uuid, OperationalJobStatus expectedStatus, - String expectedReason) + String expectedReason, + UUID expectedNodeId) { String status = "/api/v1/cassandra/operational-jobs/" + uuid; AtomicBoolean stateReached = new AtomicBoolean(false); @@ -110,12 +113,31 @@ private void pollStatusForState(String uuid, assertThat(jobStatusResp.status()).isEqualTo(expectedStatus); assertThat(jobStatusResp.reason()).isEqualTo(expectedReason); assertThat(jobStatusResp.operation()).isEqualTo("decommission"); + assertThat(jobStatusResp.startTime()).isNotNull(); + assertThat(jobStatusResp.nodesPending()).isEmpty(); + assertThat(jobStatusResp.nodesExecuting()).isEmpty(); + if (expectedStatus == SUCCEEDED) + { + assertThat(jobStatusResp.lastUpdate()).contains("completed"); + assertThat(jobStatusResp.nodesSucceeded()).containsExactly(expectedNodeId); + assertThat(jobStatusResp.nodesFailed()).isEmpty(); + } + else if (expectedStatus == FAILED) + { + assertThat(jobStatusResp.lastUpdate()).contains("failed"); + assertThat(jobStatusResp.nodesSucceeded()).isEmpty(); + assertThat(jobStatusResp.nodesFailed()).containsExactly(expectedNodeId); + } } else { assertThat(resp.statusCode()).isEqualTo(HttpResponseStatus.ACCEPTED.code()); OperationalJobResponse jobStatusResp = resp.bodyAsJson(OperationalJobResponse.class); assertThat(jobStatusResp.jobId()).isEqualTo(UUID.fromString(uuid)); + assertThat(jobStatusResp.nodesPending()).isEmpty(); + assertThat(jobStatusResp.nodesExecuting()).containsExactly(expectedNodeId); + assertThat(jobStatusResp.nodesSucceeded()).isEmpty(); + assertThat(jobStatusResp.nodesFailed()).isEmpty(); } logger.info("Request completed"); assertThat(stateReached.get()).isTrue(); diff --git a/server/src/test/java/org/apache/cassandra/sidecar/job/NodeDrainJobTest.java b/server/src/test/java/org/apache/cassandra/sidecar/job/NodeDrainJobTest.java index 4fb0ca9d1..5d32dbb04 100644 --- a/server/src/test/java/org/apache/cassandra/sidecar/job/NodeDrainJobTest.java +++ b/server/src/test/java/org/apache/cassandra/sidecar/job/NodeDrainJobTest.java @@ -58,7 +58,7 @@ void setup() { mockStorageOperations = mock(StorageOperations.class); jobId = UUIDs.timeBased(); - nodeDrainJob = new NodeDrainJob(jobId, mockStorageOperations); + nodeDrainJob = new NodeDrainJob(jobId, null, mockStorageOperations); } @Test diff --git a/server/src/test/java/org/apache/cassandra/sidecar/job/NodeMoveJobTest.java b/server/src/test/java/org/apache/cassandra/sidecar/job/NodeMoveJobTest.java index 871ec7b07..34fc4c516 100644 --- a/server/src/test/java/org/apache/cassandra/sidecar/job/NodeMoveJobTest.java +++ b/server/src/test/java/org/apache/cassandra/sidecar/job/NodeMoveJobTest.java @@ -61,14 +61,14 @@ void setUp() @Test void testJobName() { - NodeMoveJob job = new NodeMoveJob(jobId, newToken, mockStorageOperations); + NodeMoveJob job = new NodeMoveJob(jobId, null, newToken, mockStorageOperations); assertThat(job.name()).isEqualTo(OPERATION_MOVE); } @Test void testJobId() { - NodeMoveJob job = new NodeMoveJob(jobId, newToken, mockStorageOperations); + NodeMoveJob job = new NodeMoveJob(jobId, null, newToken, mockStorageOperations); assertThat(job.jobId()).isEqualTo(jobId); } @@ -76,7 +76,7 @@ void testJobId() void testIsRunningOnCassandraWhenMoving() { when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_MOVING); - NodeMoveJob job = new NodeMoveJob(jobId, newToken, mockStorageOperations); + NodeMoveJob job = new NodeMoveJob(jobId, null, newToken, mockStorageOperations); assertThat(job.hasConflict(Collections.emptyList())).isTrue(); } @@ -84,7 +84,7 @@ void testIsRunningOnCassandraWhenMoving() void testIsRunningOnCassandraWhenNormal() { when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_NORMAL); - NodeMoveJob job = new NodeMoveJob(jobId, newToken, mockStorageOperations); + NodeMoveJob job = new NodeMoveJob(jobId, null, newToken, mockStorageOperations); assertThat(job.hasConflict(Collections.emptyList())).isFalse(); } @@ -92,7 +92,7 @@ void testIsRunningOnCassandraWhenNormal() void testIsRunningOnCassandraWhenOtherMode() { when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_JOINING); - NodeMoveJob job = new NodeMoveJob(jobId, newToken, mockStorageOperations); + NodeMoveJob job = new NodeMoveJob(jobId, null, newToken, mockStorageOperations); assertThat(job.hasConflict(Collections.emptyList())).isFalse(); } @@ -100,7 +100,7 @@ void testIsRunningOnCassandraWhenOtherMode() void testStatusWhenNormal() { when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_NORMAL); - NodeMoveJob job = new NodeMoveJob(jobId, newToken, mockStorageOperations); + NodeMoveJob job = new NodeMoveJob(jobId, null, newToken, mockStorageOperations); assertThat(job.status()).isEqualTo(OperationalJobStatus.CREATED); } @@ -111,7 +111,7 @@ void testStatusWhenFailed() throws IOException RuntimeException testException = new RuntimeException("Test failure"); doThrow(testException).when(mockStorageOperations).move(newToken); - NodeMoveJob job = new NodeMoveJob(jobId, newToken, mockStorageOperations); + NodeMoveJob job = new NodeMoveJob(jobId, null, newToken, mockStorageOperations); Promise promise = Promise.promise(); job.execute(promise); @@ -124,7 +124,7 @@ void testStatusWhenFailed() throws IOException void testExecuteInternalCallsMove() throws IOException { when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_NORMAL); - NodeMoveJob job = new NodeMoveJob(jobId, newToken, mockStorageOperations); + NodeMoveJob job = new NodeMoveJob(jobId, null, newToken, mockStorageOperations); Promise promise = Promise.promise(); job.execute(promise); @@ -140,7 +140,7 @@ void testExecuteInternalHandlesException() throws IOException RuntimeException testException = new RuntimeException("Test exception"); doThrow(testException).when(mockStorageOperations).move(newToken); - NodeMoveJob job = new NodeMoveJob(jobId, newToken, mockStorageOperations); + NodeMoveJob job = new NodeMoveJob(jobId, null, newToken, mockStorageOperations); Promise promise = Promise.promise(); job.execute(promise); @@ -156,7 +156,7 @@ void testExecuteInternalHandlesException() throws IOException void testJobWithNegativeToken() { String negativeToken = "-9223372036854775808"; - NodeMoveJob job = new NodeMoveJob(jobId, negativeToken, mockStorageOperations); + NodeMoveJob job = new NodeMoveJob(jobId, null, negativeToken, mockStorageOperations); assertThat(job.name()).isEqualTo(OPERATION_MOVE); assertThat(job.jobId()).isEqualTo(jobId); } diff --git a/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobTest.java b/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobTest.java index f3f495b7c..5cda883af 100644 --- a/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobTest.java +++ b/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobTest.java @@ -123,7 +123,12 @@ public static OperationalJob createOperationalJob(UUID jobId, DurationSpec jobDu public static OperationalJob createOperationalJob(UUID jobId, DurationSpec jobDuration, OperationalJobException jobFailure) { - return new OperationalJob(jobId) + return createOperationalJob(jobId, null, jobDuration, jobFailure); + } + + public static OperationalJob createOperationalJob(UUID jobId, UUID nodeId, DurationSpec jobDuration, OperationalJobException jobFailure) + { + return new OperationalJob(jobId, nodeId) { @Override public boolean hasConflict(List jobs) @@ -170,6 +175,7 @@ void testJobCompletion() assertThat(future.succeeded()).isTrue(); assertThat(job.asyncResult().succeeded()).isTrue(); assertThat(job.status()).isEqualTo(OperationalJobStatus.SUCCEEDED); + assertThat(job.lastUpdate()).contains("completed"); } @Test @@ -204,6 +210,7 @@ protected Future executeInternal() throws OperationalJobException assertThat(failingJob.asyncResult().cause()) .isExactlyInstanceOf(OperationalJobException.class) .hasMessage(msg); + assertThat(failingJob.lastUpdate()).contains("failed"); } @Test @@ -245,4 +252,68 @@ void testGetAsyncResultExceedsWaitTime() assertThat(longRunning.isExecuting()).isTrue(); }); } + + @Test + void testNodeListsEmptyWhenNoNodeId() + { + OperationalJob job = createOperationalJob(OperationalJobStatus.CREATED); + assertThat(job.nodeId()).isNull(); + assertThat(job.nodesPending()).isEmpty(); + assertThat(job.nodesExecuting()).isEmpty(); + assertThat(job.nodesSucceeded()).isEmpty(); + assertThat(job.nodesFailed()).isEmpty(); + } + + @Test + void testNodeListsInitializedWhenNodeIdProvided() + { + UUID nodeId = UUID.randomUUID(); + OperationalJob job = createOperationalJob(UUIDs.timeBased(), nodeId, null, null); + assertThat(job.nodeId()).isEqualTo(nodeId); + assertThat(job.nodesPending()).containsExactly(nodeId); + assertThat(job.nodesExecuting()).isEmpty(); + assertThat(job.nodesSucceeded()).isEmpty(); + assertThat(job.nodesFailed()).isEmpty(); + } + + @Test + void testNodeListsTransitionOnSuccess() + { + UUID nodeId = UUID.randomUUID(); + OperationalJob job = createOperationalJob(UUIDs.timeBased(), nodeId, + MillisecondBoundConfiguration.parse("10ms"), null); + job.execute(Promise.promise()); + + assertThat(job.nodesPending()).isEmpty(); + assertThat(job.nodesExecuting()).isEmpty(); + assertThat(job.nodesSucceeded()).containsExactly(nodeId); + assertThat(job.nodesFailed()).isEmpty(); + } + + @Test + void testNodeListsTransitionOnFailure() + { + UUID nodeId = UUID.randomUUID(); + OperationalJobException failure = new OperationalJobException("test failure"); + OperationalJob job = createOperationalJob(UUIDs.timeBased(), nodeId, + null, failure); + job.execute(Promise.promise()); + + assertThat(job.nodesPending()).isEmpty(); + assertThat(job.nodesExecuting()).isEmpty(); + assertThat(job.nodesSucceeded()).isEmpty(); + assertThat(job.nodesFailed()).containsExactly(nodeId); + } + + @Test + void testStartTimeIsSetWhenJobStarts() + { + OperationalJob job = createOperationalJob(UUIDs.timeBased(), + MillisecondBoundConfiguration.parse("10ms")); + long before = System.currentTimeMillis(); + job.execute(Promise.promise()); + long after = System.currentTimeMillis(); + + assertThat(job.startTime()).isBetween(before, after); + } }