Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

0.3.0
-----
* Adding endpoint for verifying files post data copy during live migration (CASSSIDECAR-226)
* RangeManager should be singleton in CDCModule (CASSSIDECAR-411)
* CDC: Add end-to-end CDC integration tests (CASSSIDECAR-308)
* SchemaStorePublisherFactory should be Injectable in CachingSchemaStore (CASSSIDECAR-408)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ public final class ApiEndpointsV1
public static final String LIVE_MIGRATION_DATA_COPY_TASK_ROUTE = LIVE_MIGRATION_DATA_COPY_TASKS_ROUTE + "/:taskId";
public static final String LIVE_MIGRATION_STATUS_ROUTE = LIVE_MIGRATION_API_PREFIX + "/status";

public static final String LIVE_MIGRATION_FILES_VERIFICATION_TASKS_ROUTE = LIVE_MIGRATION_API_PREFIX + "/files-verification-tasks";
public static final String LIVE_MIGRATION_FILES_VERIFICATION_TASK_ROUTE = LIVE_MIGRATION_FILES_VERIFICATION_TASKS_ROUTE + "/:taskId";

public static final String OPENAPI_JSON_ROUTE = "/spec/openapi.json";
public static final String OPENAPI_YAML_ROUTE = "/spec/openapi.yaml";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.sidecar.common.request;

import java.util.Objects;

import io.netty.handler.codec.http.HttpMethod;
import org.apache.cassandra.sidecar.common.response.DigestResponse;

/**
* Represents a request to retrieve file digest for validation during live migration.
* Supports configurable digest algorithms for verification.
*/
public class LiveMigrationFileDigestRequest extends JsonRequest<DigestResponse>
{
public static final String DIGEST_ALGORITHM_PARAM = "digestAlgorithm";

/**
* Private constructor for internal use by the factory method
*
* @param fullRequestURI the complete URI with query parameters
*/
private LiveMigrationFileDigestRequest(String fullRequestURI)
{
super(fullRequestURI);
}

/**
* Creates a live migration file digest request with validation
*
* @param requestURI the base URI of the request
* @param digestAlgorithm the digest algorithm to use (e.g., "MD5", "XXHash32")
* @return a new LiveMigrationFileDigestRequest instance
* @throws NullPointerException if requestURI or digestAlgorithm is null
* @throws IllegalArgumentException if digestAlgorithm is empty
*/
public static LiveMigrationFileDigestRequest create(String requestURI, String digestAlgorithm)
{
Objects.requireNonNull(requestURI, "requestURI cannot be null");
Objects.requireNonNull(digestAlgorithm, "digestAlgorithm cannot be null");
if (digestAlgorithm.isEmpty())
{
throw new IllegalArgumentException("digestAlgorithm cannot be empty");
}

String fullURI = String.format("%s?%s=%s", requestURI, DIGEST_ALGORITHM_PARAM, digestAlgorithm);

return new LiveMigrationFileDigestRequest(fullURI);
}

@Override
public HttpMethod method()
{
return HttpMethod.GET;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.sidecar.common.request;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;

/**
* Request to verify file integrity during live migration by computing and comparing digests.
* Supports configurable concurrency and digest algorithms.
*/
@JsonIgnoreProperties(ignoreUnknown = true)
public class LiveMigrationFilesVerificationRequest
{
private final int maxConcurrency;
private final String digestAlgorithm;

@JsonCreator
public LiveMigrationFilesVerificationRequest(@JsonProperty("maxConcurrency") int maxConcurrency,
@JsonProperty("digestAlgorithm") String digestAlgorithm)
{
if (maxConcurrency <= 0)
{
throw new IllegalArgumentException("maxConcurrency must be >= 1");
}
if (digestAlgorithm == null || digestAlgorithm.trim().isEmpty())
{
throw new IllegalArgumentException("digestAlgorithm cannot be null or empty");
}

this.maxConcurrency = maxConcurrency;
this.digestAlgorithm = digestAlgorithm;
}

@JsonProperty("maxConcurrency")
public int maxConcurrency()
{
return maxConcurrency;
}

@JsonProperty("digestAlgorithm")
public String digestAlgorithm()
{
return digestAlgorithm;
}

@Override
public String toString()
{
return "LiveMigrationFilesVerificationRequest{" +
"maxConcurrency=" + maxConcurrency +
", digestAlgorithm='" + digestAlgorithm + '\'' +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
*/
public class MD5Digest implements Digest
{
public static final String MD5_ALGORITHM = "MD5";

private final @NotNull String value;

/**
Expand All @@ -54,7 +56,7 @@ public String value()
@Override
public String algorithm()
{
return "MD5";
return MD5_ALGORITHM;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
*/
public class XXHash32Digest implements Digest
{
public static final String XXHASH_32_ALGORITHM = "XXHash32";
private final @NotNull String value;
private final @Nullable String seedHex;

Expand Down Expand Up @@ -90,7 +91,7 @@ public String value()
@Override
public String algorithm()
{
return "XXHash32";
return XXHASH_32_ALGORITHM;
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.sidecar.common.response;

import java.util.Objects;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.cassandra.sidecar.common.request.data.Digest;
import org.apache.cassandra.sidecar.common.request.data.MD5Digest;
import org.apache.cassandra.sidecar.common.request.data.XXHash32Digest;

/**
* Response object containing a cryptographic digest value for file verification purposes.
*/
public class DigestResponse
{
@JsonProperty("digest")
public final String digest;
@JsonProperty("digestAlgorithm")
public final String digestAlgorithm;

@JsonCreator
public DigestResponse(@JsonProperty("digest") String digest,
@JsonProperty("digestAlgorithm") String digestAlgorithm)
{
Objects.requireNonNull(digest, "digest is required");
Objects.requireNonNull(digestAlgorithm, "digestAlgorithm is required");
this.digest = digest;
this.digestAlgorithm = digestAlgorithm;
}

@Override
public String toString()
{
return "DigestResponse{" +
"digest='" + digest + '\'' +
", digestAlgorithm='" + digestAlgorithm + '\'' +
'}';
}

@JsonIgnore
public Digest toDigest()
{
if (digestAlgorithm.equalsIgnoreCase(MD5Digest.MD5_ALGORITHM))
{
return new MD5Digest(digest);
}
else if (digestAlgorithm.equalsIgnoreCase(XXHash32Digest.XXHASH_32_ALGORITHM))
{
return new XXHash32Digest(digest);
}

throw new IllegalArgumentException("Digest algorithm " + digestAlgorithm + " is unknown");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,15 @@ public int hashCode()
{
return Objects.hash(fileUrl, size, fileType, lastModifiedTime);
}

@Override
public String toString()
{
return "InstanceFileInfo{" +
"fileUrl='" + fileUrl + '\'' +
", size=" + size +
", fileType=" + fileType +
", lastModifiedTime=" + lastModifiedTime +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
/**
* Represents response of Live Migration task.
*/
public class LiveMigrationTaskResponse
public class LiveMigrationDataCopyResponse
{

private final String taskId;
Expand All @@ -40,23 +40,23 @@ public class LiveMigrationTaskResponse
private final String source;
private final int port;

public LiveMigrationTaskResponse(String taskId,
String source,
int port,
LiveMigrationDataCopyRequest request,
List<Status> statusList)
public LiveMigrationDataCopyResponse(String taskId,
String source,
int port,
LiveMigrationDataCopyRequest request,
List<Status> statusList)
{
this(taskId, source, port, request.maxIterations, request.successThreshold, request.maxConcurrency, statusList);
}

@JsonCreator
public LiveMigrationTaskResponse(@JsonProperty("taskId") String taskId,
@JsonProperty("source") String source,
@JsonProperty("port") int port,
@JsonProperty("maxIterations") int maxIterations,
@JsonProperty("successThreshold") double successThreshold,
@JsonProperty("maxConcurrency") int maxConcurrency,
@JsonProperty("status") List<Status> statusList)
public LiveMigrationDataCopyResponse(@JsonProperty("taskId") String taskId,
@JsonProperty("source") String source,
@JsonProperty("port") int port,
@JsonProperty("maxIterations") int maxIterations,
@JsonProperty("successThreshold") double successThreshold,
@JsonProperty("maxConcurrency") int maxConcurrency,
@JsonProperty("status") List<Status> statusList)
{
this.taskId = taskId;
this.source = source;
Expand Down
Loading