Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

0.3.0
-----
* Update OperationalJob class to support cluster-wide operations (CASSSIDECAR-376)
* RangeManager should be singleton in CDCModule (CASSSIDECAR-411)
* CDC: Add end-to-end CDC integration tests (CASSSIDECAR-308)
* SchemaStorePublisherFactory should be Injectable in CachingSchemaStore (CASSSIDECAR-408)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.cassandra.sidecar.common.response;

import java.util.List;
import java.util.UUID;

import com.fasterxml.jackson.annotation.JsonCreator;
Expand All @@ -37,17 +38,35 @@ public class OperationalJobResponse
private final OperationalJobStatus status;
private final String operation;
private final String reason;
private final String startTime;
private final List<UUID> nodesPending;
private final List<UUID> nodesExecuting;
private final List<UUID> nodesSucceeded;
private final List<UUID> nodesFailed;
private final String lastUpdate;

@JsonCreator
public OperationalJobResponse(@JsonProperty("jobId") UUID jobId,
@JsonProperty("jobStatus") OperationalJobStatus status,
@JsonProperty("operation") String operation,
@JsonProperty("reason") String reason)
@JsonProperty("reason") String reason,
@JsonProperty("startTime") String startTime,
@JsonProperty("nodesPending") List<UUID> nodesPending,
@JsonProperty("nodesExecuting") List<UUID> nodesExecuting,
@JsonProperty("nodesSucceeded") List<UUID> nodesSucceeded,
@JsonProperty("nodesFailed") List<UUID> nodesFailed,
@JsonProperty("lastUpdate") String lastUpdate)
{
this.jobId = jobId;
this.status = status;
this.operation = operation;
this.reason = reason;
this.startTime = startTime;
this.nodesPending = nodesPending;
this.nodesExecuting = nodesExecuting;
this.nodesSucceeded = nodesSucceeded;
this.nodesFailed = nodesFailed;
this.lastUpdate = lastUpdate;
}

/**
Expand Down Expand Up @@ -86,4 +105,57 @@ public String reason()
return reason;
}

/**
* @return the time the job execution started
*/
@JsonProperty("startTime")
public String startTime()
{
return startTime;
}

/**
* @return list of node IDs pending execution
*/
@JsonProperty("nodesPending")
public List<UUID> nodesPending()
{
return nodesPending;
}

/**
* @return list of node IDs currently executing
*/
@JsonProperty("nodesExecuting")
public List<UUID> nodesExecuting()
{
return nodesExecuting;
}

/**
* @return list of node IDs that have succeeded
*/
@JsonProperty("nodesSucceeded")
public List<UUID> nodesSucceeded()
{
return nodesSucceeded;
}

/**
* @return list of node IDs that have failed
*/
@JsonProperty("nodesFailed")
public List<UUID> nodesFailed()
{
return nodesFailed;
}

/**
* @return a human-readable status message
*/
@JsonProperty("lastUpdate")
public String lastUpdate()
{
return lastUpdate;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ protected void beforeTestStart()
@Test
void testNodeDrainOperationSuccess()
{
String expectedHostId = getRingEntryForNode("localhost").hostId();

// Initiate drain operation
HttpResponse<Buffer> drainResponse = getBlocking(
trustedClient().put(serverWrapper.serverPort, "localhost", ApiEndpointsV1.NODE_DRAIN_ROUTE)
Expand Down Expand Up @@ -102,7 +104,7 @@ void testNodeDrainOperationSuccess()

// Validate the operational job status using the OperationalJobHandler
String jobId = responseBody.getString("jobId");
validateOperationalJobStatus(jobId, "drain", OperationalJobStatus.SUCCEEDED);
validateOperationalJobStatus(jobId, "drain", OperationalJobStatus.SUCCEEDED, expectedHostId);
}


Expand All @@ -114,7 +116,7 @@ void testNodeMoveOperationSuccess()
String requestBody = "{\"newToken\":\"" + testToken + "\"}";

// Validate that the node owns a different token than testToken
String currentToken = getCurrentTokenForNode("localhost");
String currentToken = getRingEntryForNode("localhost").token();
assertThat(currentToken).isNotEqualTo(testToken);

// Initiate move operation
Expand Down Expand Up @@ -151,10 +153,11 @@ void testNodeMoveOperationSuccess()

// Validate the operational job status using the OperationalJobHandler
String jobId = responseBody.getString("jobId");
validateOperationalJobStatus(jobId, "move", OperationalJobStatus.SUCCEEDED);
String expectedHostId = getRingEntryForNode("localhost").hostId();
validateOperationalJobStatus(jobId, "move", OperationalJobStatus.SUCCEEDED, expectedHostId);

// Validate that the node actually owns the new token
currentToken = getCurrentTokenForNode("localhost");
currentToken = getRingEntryForNode("localhost").token();
assertThat(currentToken).isEqualTo(testToken);
}

Expand All @@ -175,11 +178,11 @@ void testNodeMoveOperationSuccess()
void testNodeMoveOperationFailure()
{
// Get a token already owned by a node
String testToken = getCurrentTokenForNode("localhost2");
String testToken = getRingEntryForNode("localhost2").token();
String requestBody = "{\"newToken\":\"" + testToken + "\"}";

// Validate that the node owns a different token than testToken
String initialToken = getCurrentTokenForNode("localhost");
String initialToken = getRingEntryForNode("localhost").token();
assertThat(initialToken).isNotEqualTo(testToken);

// Initiate move operation
Expand Down Expand Up @@ -216,36 +219,32 @@ void testNodeMoveOperationFailure()

// Validate the operational job status using the OperationalJobHandler
String jobId = responseBody.getString("jobId");
validateOperationalJobStatus(jobId, "move", OperationalJobStatus.FAILED);
String expectedHostId = getRingEntryForNode("localhost").hostId();
validateOperationalJobStatus(jobId, "move", OperationalJobStatus.FAILED, expectedHostId);

// Validate that the node didn't move
String currentToken = getCurrentTokenForNode("localhost");
String currentToken = getRingEntryForNode("localhost").token();
assertThat(currentToken).isEqualTo(initialToken);
assertThat(currentToken).isNotEqualTo(testToken);
}

/**
* Gets the current token for the specified node by querying the ring endpoint.
* Gets the ring entry for the specified node by querying the ring endpoint.
*
* @param node the node hostname to get the token for
* @return the token currently owned by the specified node
* @param node the node hostname to look up
* @return the {@link RingEntry} for the specified node
*/
private String getCurrentTokenForNode(String node)
private RingEntry getRingEntryForNode(String node)
{
HttpResponse<Buffer> ringResponse = getBlocking(
trustedClient().get(serverWrapper.serverPort, node, ApiEndpointsV1.RING_ROUTE)
.send());

assertThat(ringResponse.statusCode()).isEqualTo(OK.code());

RingResponse ring = ringResponse.bodyAsJson(RingResponse.class);
assertThat(ring).isNotNull();

RingEntry ringEntry = ring.stream()
.filter(entry -> entry.fqdn().equals(node))
.findFirst()
.orElseThrow(() -> new AssertionError("Node " + node + " not found in ring"));
return ringEntry.token();
return ring.stream()
.filter(entry -> entry.fqdn().equals(node))
.findFirst()
.orElseThrow(() -> new AssertionError("Node " + node + " not found in ring"));
}

/**
Expand All @@ -254,8 +253,11 @@ private String getCurrentTokenForNode(String node)
*
* @param jobId the ID of the operational job to validate
* @param expectedOperation the expected operation name (e.g., "move", "decommission", "drain")
* @param expectedEndStatus the expected final status of the job
* @param expectedHostId the expected Cassandra host ID in the node tracking lists
*/
private void validateOperationalJobStatus(String jobId, String expectedOperation, OperationalJobStatus expectedEndStatus)
private void validateOperationalJobStatus(String jobId, String expectedOperation,
OperationalJobStatus expectedEndStatus, String expectedHostId)
{
String operationalJobRoute = ApiEndpointsV1.OPERATIONAL_JOB_ROUTE.replace(":operationId", jobId);

Expand All @@ -270,9 +272,14 @@ private void validateOperationalJobStatus(String jobId, String expectedOperation
assertThat(jobStatusBody.getString("jobId")).isEqualTo(jobId);
assertThat(jobStatusBody.getString("operation")).isEqualTo(expectedOperation);

// If the job is still running, wait for it to complete or reach a final state
// If the job is still running, verify node tracking lists reflect the executing state
if (OperationalJobStatus.RUNNING.name().equals(jobStatusBody.getString("jobStatus")))
{
assertThat(jobStatusBody.getJsonArray("nodesPending")).isEmpty();
assertThat(jobStatusBody.getJsonArray("nodesExecuting")).containsExactly(expectedHostId);
assertThat(jobStatusBody.getJsonArray("nodesSucceeded")).isEmpty();
assertThat(jobStatusBody.getJsonArray("nodesFailed")).isEmpty();

loopAssert(30, 500, () -> {
HttpResponse<Buffer> finalJobStatusResponse = getBlocking(
trustedClient().get(serverWrapper.serverPort, "localhost", operationalJobRoute)
Expand Down Expand Up @@ -300,6 +307,21 @@ private void validateOperationalJobStatus(String jobId, String expectedOperation
assertThat(jobStatusBody.getString("jobId")).isEqualTo(jobId);
assertThat(jobStatusBody.getString("operation")).isEqualTo(expectedOperation);
assertThat(jobStatusBody.getString("jobStatus")).isEqualTo(expectedEndStatus.name());
assertThat(jobStatusBody.getString("startTime")).isNotNull();
assertThat(jobStatusBody.getJsonArray("nodesPending")).isEmpty();
assertThat(jobStatusBody.getJsonArray("nodesExecuting")).isEmpty();
if (expectedEndStatus == OperationalJobStatus.SUCCEEDED)
{
assertThat(jobStatusBody.getString("lastUpdate")).contains("completed");
assertThat(jobStatusBody.getJsonArray("nodesSucceeded")).containsExactly(expectedHostId);
assertThat(jobStatusBody.getJsonArray("nodesFailed")).isEmpty();
}
else if (expectedEndStatus == OperationalJobStatus.FAILED)
{
assertThat(jobStatusBody.getString("lastUpdate")).contains("failed");
assertThat(jobStatusBody.getJsonArray("nodesSucceeded")).isEmpty();
assertThat(jobStatusBody.getJsonArray("nodesFailed")).containsExactly(expectedHostId);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import javax.management.Notification;
Expand Down Expand Up @@ -520,6 +521,25 @@ public boolean isJmxUp()
return nodeSettingsFromJmx != null;
}

/**
* @return the Cassandra host UUID fetched via JMX
* @throws CassandraUnavailableException when JMX is not available
*/
@NotNull
public UUID hostId() throws CassandraUnavailableException
{
try
{
LimitedStorageOperations storageOperations =
jmxClient.proxy(LimitedStorageOperations.class, STORAGE_SERVICE_OBJ_NAME);
return UUID.fromString(storageOperations.getLocalHostId());
}
catch (RuntimeException e)
{
throw new CassandraUnavailableException(JMX, e);
}
}

public void close()
{
closed = true;
Expand Down Expand Up @@ -689,6 +709,11 @@ public interface LimitedStorageOperations
* @return a collection of tokens formatted as strings
*/
List<String> getTokens();

/**
* @return the local host ID for this Cassandra node
*/
String getLocalHostId();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@
import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
import org.jetbrains.annotations.NotNull;

import static org.apache.cassandra.sidecar.common.data.OperationalJobStatus.RUNNING;

/**
* Handler for retrieving the all the jobs running on the sidecar
*/
Expand Down Expand Up @@ -72,7 +70,11 @@ protected void handleInternal(RoutingContext context, HttpServerRequest httpRequ
ListOperationalJobsResponse listResponse = new ListOperationalJobsResponse();
jobManager.allInflightJobs()
.stream()
.map(job -> new OperationalJobResponse(job.jobId(), RUNNING, job.name(), null))
.map(job -> new OperationalJobResponse(job.jobId(), job.status(), job.name(), null,
job.formattedStartTime(),
job.nodesPending(), job.nodesExecuting(),
job.nodesSucceeded(), job.nodesFailed(),
job.lastUpdate()))
.forEach(listResponse::addJob);
context.json(listResponse);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.Collections;
import java.util.Set;
import java.util.UUID;

import com.datastax.driver.core.utils.UUIDs;
import com.google.inject.Inject;
Expand All @@ -28,6 +29,7 @@
import io.vertx.ext.auth.authorization.Authorization;
import io.vertx.ext.web.RoutingContext;
import org.apache.cassandra.sidecar.acl.authorization.BasicPermissions;
import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate;
import org.apache.cassandra.sidecar.common.server.StorageOperations;
import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
import org.apache.cassandra.sidecar.config.ServiceConfiguration;
Expand Down Expand Up @@ -83,8 +85,10 @@ public void handleInternal(RoutingContext context,
SocketAddress remoteAddress,
Boolean isForce)
{
StorageOperations operations = metadataFetcher.delegate(host).storageOperations();
NodeDecommissionJob job = new NodeDecommissionJob(UUIDs.timeBased(), operations, isForce);
CassandraAdapterDelegate delegate = metadataFetcher.delegate(host);
StorageOperations operations = delegate.storageOperations();
UUID nodeId = delegate.hostId();
NodeDecommissionJob job = new NodeDecommissionJob(UUIDs.timeBased(), nodeId, operations, isForce);
this.jobManager.trySubmitJob(job,
(completedJob, exception) ->
OperationalJobUtils.sendStatusBasedResponse(context, completedJob, exception),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.Collections;
import java.util.Set;
import java.util.UUID;

import com.datastax.driver.core.utils.UUIDs;
import com.google.inject.Inject;
Expand All @@ -28,6 +29,7 @@
import io.vertx.ext.auth.authorization.Authorization;
import io.vertx.ext.web.RoutingContext;
import org.apache.cassandra.sidecar.acl.authorization.BasicPermissions;
import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate;
import org.apache.cassandra.sidecar.common.server.StorageOperations;
import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
import org.apache.cassandra.sidecar.config.ServiceConfiguration;
Expand Down Expand Up @@ -81,8 +83,10 @@ public void handleInternal(RoutingContext context,
SocketAddress remoteAddress,
Void unused)
{
StorageOperations operations = metadataFetcher.delegate(host).storageOperations();
NodeDrainJob job = new NodeDrainJob(UUIDs.timeBased(), operations);
CassandraAdapterDelegate delegate = metadataFetcher.delegate(host);
StorageOperations operations = delegate.storageOperations();
UUID nodeId = delegate.hostId();
NodeDrainJob job = new NodeDrainJob(UUIDs.timeBased(), nodeId, operations);
this.jobManager.trySubmitJob(job,
(completedJob, exception) ->
OperationalJobUtils.sendStatusBasedResponse(context, completedJob, exception),
Expand Down
Loading
Loading