Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pauseless Consumption #3: Disaster Recovery with Reingestion #14920

Open
wants to merge 68 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 39 commits
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
6defce3
Controller side changes to allow pauseless ingestion:
9aman Dec 19, 2024
54ab7b3
Minor improvements to improve readability
9aman Dec 20, 2024
1e40134
Server side changes to enable pauseless consumption. The changes include
9aman Dec 23, 2024
012da87
Fixes in the commit protocol for pauseless ingestion to complete the …
9aman Dec 24, 2024
a97847f
Changing the way to enable pausless post introduction of multi-stream…
9aman Dec 30, 2024
2c2ba86
WIP: Changes in the expected state while performing recovery during p…
9aman Dec 30, 2024
4d7c893
Adding changes for ensurePartitionsConsuming function that is utilize…
9aman Dec 30, 2024
a041a75
Add Server side Reingestion API
Dec 30, 2024
b6d0904
run segment level validation
Dec 30, 2024
58f6c51
Add method to trigger reingestion
Dec 30, 2024
d6313b3
Linting fixes
Dec 30, 2024
ca6134a
Adding integration tests for 3 failure scenarios that can occur durin…
9aman Jan 2, 2025
845f616
WIP: Reingestion test
Jan 6, 2025
d2dd313
Fix bug with indexes in reingestion
Jan 6, 2025
fb34fc8
Add tests for reingestion
Jan 8, 2025
50725bd
Fix controller url in Reingestion
Jan 14, 2025
e74d360
Support https and auth in reingestion
Jan 14, 2025
ce3c851
Merge branch 'master' into resolve-failures-pauseless-ingestion
9aman Jan 15, 2025
d6208a6
Removing checks on default crc and replacing them with segment status…
9aman Jan 15, 2025
7f5b720
Formatting improvements
9aman Jan 15, 2025
3f05b2f
Allowing null table config to be passed for checking pauseless consum…
9aman Jan 15, 2025
2012e38
Ensuring upload retries are backward compatible
9aman Jan 15, 2025
7b9da37
Removing unnecessary code
9aman Jan 15, 2025
ded8962
Fixing existing test cases and adding unit tests to check upload to d…
9aman Jan 16, 2025
e703d84
Refactor test cases to reduce repetition
9aman Jan 16, 2025
c2fda4a
Add missing header file
9aman Jan 16, 2025
c836009
Fix reingestion test
Jan 17, 2025
7974ab9
refactoring file upload download client
Jan 17, 2025
c08f841
Removing pauselessConsumptionEnabled from index config
9aman Jan 21, 2025
c4b99bd
Remove reingestion code
Jan 21, 2025
aee514c
Removing check in replaceSegmentIfCrcMismatch
9aman Jan 23, 2025
88a619a
Adding a new class for simple serialization and deserialization of ma…
9aman Jan 24, 2025
791ac21
Removing files related to reingestion tests
9aman Jan 24, 2025
8db5bae
Merging master and including force commit PR changes
9aman Jan 24, 2025
55b2b29
Revert "Remove reingestion code"
Jan 27, 2025
f74df66
Revert "Removing files related to reingestion tests"
Jan 27, 2025
1f4db11
Fix reingestion issue where consuming segemnts are not replaced
Jan 27, 2025
8e9249c
Copy full segment to deep store before triggerring metadata upload
Jan 27, 2025
b804a69
Refactoring: added support for tracking running reingestion jobs
Jan 27, 2025
609942d
Refactor: fix doc comments
Jan 27, 2025
f939714
Make SegmentZKMetadata JSON serializable
Jackie-Jiang Jan 28, 2025
155c49f
Minor API name change
Jackie-Jiang Jan 28, 2025
080ec55
Refactor PinotLLC class to add ability to inject failures
Jan 28, 2025
7e04fa3
Minor improvements
9aman Jan 28, 2025
523913f
Moving ZkMetadaptaUtils to commons and reusing the code in the upload…
9aman Jan 28, 2025
5689333
Fix lint failures
Jan 28, 2025
f42c6b8
Misc fix and cleanup
Jackie-Jiang Jan 29, 2025
a2eebf9
The tests were running slow due to the condition that the IdealState …
9aman Jan 29, 2025
1ba5b6c
Refactor Reingestion integration test
Jan 29, 2025
11aa170
Merge remote-tracking branch 'upstream/master' into pauseless-reinges…
Jan 29, 2025
e84788a
Fix error in tests post rebase
Jan 29, 2025
0d46327
refactoring
Jan 30, 2025
a94c7e3
Remove redundant code
Feb 3, 2025
8b9b8d1
Add support for queue in reingestion
Feb 3, 2025
d035249
Refactoring
Feb 4, 2025
bc8a65b
refaactoring
Feb 4, 2025
f082d24
Honour segment build semaphore during reingestion
Feb 4, 2025
d1ad30b
Fix test
Feb 5, 2025
7e47dd5
Add a seperate API to upload reingested segments
Feb 5, 2025
5a42c28
Cleanup code
Feb 5, 2025
1551685
Replace reIngest with reingest
Feb 6, 2025
837aa26
Replace reIngest with reingest
Feb 6, 2025
09e8583
Ensure correctness in reingestion
Feb 6, 2025
3d6fdf4
Decouple reingest completion and upload ZK path
Feb 6, 2025
b525c7e
Refactor Stateless segment writer to have least number of arguments
Feb 6, 2025
f7ae25f
Fix segment reset for pauseless tables
Feb 6, 2025
3c17957
Cleaning up Stateless segment writer
Feb 6, 2025
a3fa25c
Rename metric
Feb 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.common.metadata.segment;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonGetter;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.spi.utils.CommonConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* A simplified version of SegmentZKMetadata designed for easy serialization and deserialization.
* This class maintains only the essential fields from SegmentZKMetadata while providing JSON
* serialization support via Jackson annotations.
*
* Use cases:
* 1. Serializing segment metadata for API responses
* 2. Transferring segment metadata between services
* 3. Storing segment metadata in a format that's easy to deserialize
*
* Example usage:
* SimpleSegmentMetadata metadata = SimpleSegmentMetadata.fromZKMetadata(zkMetadata);
* String json = JsonUtils.objectToString(metadata);
* SimpleSegmentMetadata deserialized = objectMapper.readValue(json, SimpleSegmentMetadata.class);
*/
@JsonIgnoreProperties(ignoreUnknown = true)
public class SimpleSegmentMetadata {
private static final Logger LOGGER = LoggerFactory.getLogger(SimpleSegmentMetadata.class);
private final String _segmentName;
private final Map<String, String> _simpleFields;

@JsonCreator
public SimpleSegmentMetadata(
@JsonProperty("segmentName") String segmentName,
@JsonProperty("simpleFields") Map<String, String> simpleFields) {
_segmentName = segmentName;
_simpleFields = new HashMap<>(simpleFields);
}

@JsonGetter
public String getSegmentName() {
return _segmentName;
}

@JsonGetter
public Map<String, String> getSimpleFields() {
return Collections.unmodifiableMap(_simpleFields);
}

public long getStartTimeMs() {
long startTimeMs = -1;
String startTimeString = _simpleFields.get(CommonConstants.Segment.START_TIME);
if (startTimeString != null) {
long startTime = Long.parseLong(startTimeString);
if (startTime > 0) {
// NOTE: Need to check whether the start time is positive because some old segment ZK metadata contains negative
// start time and null time unit
startTimeMs = TimeUnit.valueOf(_simpleFields.get(CommonConstants.Segment.TIME_UNIT)).toMillis(startTime);
}
}
return startTimeMs;
}

public long getEndTimeMs() {
long endTimeMs = -1;
String endTimeString = _simpleFields.get(CommonConstants.Segment.END_TIME);
if (endTimeString != null) {
long endTime = Long.parseLong(endTimeString);
// NOTE: Need to check whether the end time is positive because some old segment ZK metadata contains negative
// end time and null time unit
if (endTime > 0) {
endTimeMs = TimeUnit.valueOf(_simpleFields.get(CommonConstants.Segment.TIME_UNIT)).toMillis(endTime);
}
}
return endTimeMs;
}

public String getIndexVersion() {
return _simpleFields.get(CommonConstants.Segment.INDEX_VERSION);
}

public long getTotalDocs() {
String value = _simpleFields.get(CommonConstants.Segment.TOTAL_DOCS);
return value != null ? Long.parseLong(value) : -1;
}

public SegmentPartitionMetadata getPartitionMetadata() {
String partitionMetadataJson = _simpleFields.get(CommonConstants.Segment.PARTITION_METADATA);
if (partitionMetadataJson != null) {
try {
return SegmentPartitionMetadata.fromJsonString(partitionMetadataJson);
} catch (Exception e) {
LOGGER.error("Caught exception while reading partition metadata for segment: {}", getSegmentName(), e);
}
}
return null;
}

public long getSizeInBytes() {
String value = _simpleFields.get(CommonConstants.Segment.SIZE_IN_BYTES);
return value != null ? Long.parseLong(value) : -1;
}

public long getCrc() {
String value = _simpleFields.get(CommonConstants.Segment.CRC);
return value != null ? Long.parseLong(value) : -1;
}

public String getDownloadUrl() {
String downloadUrl = _simpleFields.get(CommonConstants.Segment.DOWNLOAD_URL);
// Handle legacy download url keys
if (downloadUrl == null) {
downloadUrl = _simpleFields.get(CommonConstants.Segment.Offline.DOWNLOAD_URL);
if (downloadUrl == null) {
downloadUrl = _simpleFields.get(CommonConstants.Segment.Realtime.DOWNLOAD_URL);
}
}
return downloadUrl;
}

/**
* Creates a SimpleSegmentMetadata instance from a SegmentZKMetadata object.
* This method copies all simple fields from the ZK metadata while maintaining
* the immutability guarantees of this class.
*/
public static SimpleSegmentMetadata fromZKMetadata(SegmentZKMetadata zkMetadata) {
return new SimpleSegmentMetadata(
zkMetadata.getSegmentName(),
zkMetadata.toMap()
);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SimpleSegmentMetadata that = (SimpleSegmentMetadata) o;
return Objects.equals(_segmentName, that._segmentName)
&& Objects.equals(_simpleFields, that._simpleFields);
}

@Override
public int hashCode() {
return Objects.hash(_segmentName, _simpleFields);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@
import org.apache.hc.core5.http.message.BasicNameValuePair;
import org.apache.pinot.common.auth.AuthProviderUtils;
import org.apache.pinot.common.exception.HttpErrorStatusException;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metadata.segment.SimpleSegmentMetadata;
import org.apache.pinot.common.restlet.resources.EndReplaceSegmentsRequest;
import org.apache.pinot.common.restlet.resources.StartReplaceSegmentsRequest;
import org.apache.pinot.common.restlet.resources.TableLLCSegmentUploadResponse;
Expand Down Expand Up @@ -125,6 +127,7 @@ public static FileUploadType getDefaultUploadType() {
private static final String FORCE_CLEANUP_PARAMETER = "&forceCleanup=";

private static final String RETENTION_PARAMETER = "retention=";
public static final String REINGEST_SEGMENT_PATH = "/reingestSegment";

private static final List<String> SUPPORTED_PROTOCOLS = Arrays.asList(HTTP, HTTPS);

Expand Down Expand Up @@ -990,6 +993,32 @@ public TableLLCSegmentUploadResponse uploadLLCToSegmentStore(String uri)
return tableLLCSegmentUploadResponse;
}

/**
* Used by controllers to send requests to servers: Controller periodic task uses this endpoint to ask servers
* to upload committed llc segment to segment store if missing.
* @param uri The uri to ask servers to upload segment to segment store
* @return {@link SegmentZKMetadata} - segment download url, crc, other metadata
* @throws URISyntaxException
* @throws IOException
* @throws HttpErrorStatusException
*/
public SimpleSegmentMetadata uploadLLCToSegmentStoreWithZKMetadata(String uri)
throws URISyntaxException, IOException, HttpErrorStatusException {
ClassicRequestBuilder requestBuilder = ClassicRequestBuilder.post(new URI(uri)).setVersion(HttpVersion.HTTP_1_1);
// sendRequest checks the response status code
SimpleHttpResponse response = HttpClient.wrapAndThrowHttpException(
_httpClient.sendRequest(requestBuilder.build(), HttpClient.DEFAULT_SOCKET_TIMEOUT_MS));
SimpleSegmentMetadata segmentZKMetadata =
JsonUtils.stringToObject(response.getResponse(), SimpleSegmentMetadata.class);
if (segmentZKMetadata.getDownloadUrl() == null
|| segmentZKMetadata.getDownloadUrl().isEmpty()) {
throw new HttpErrorStatusException(
String.format("Returned segment download url is empty after requesting servers to upload by the path: %s",
uri), response.getStatusCode());
}
return segmentZKMetadata;
}

/**
* Send segment uri.
*
Expand Down Expand Up @@ -1248,6 +1277,47 @@ public File downloadUntarFileStreamed(URI uri, File dest, AuthProvider authProvi
httpHeaders, maxStreamRateInByte);
}

/**
* Invokes the server's reIngestSegment API via a POST request with JSON payload,
* using Simple HTTP APIs.
*
* POST http://[serverURL]/reIngestSegment
KKcorps marked this conversation as resolved.
Show resolved Hide resolved
* {
* "tableNameWithType": [tableName],
* "segmentName": [segmentName]
* }
*/
public void triggerReIngestion(String serverHostPort, String tableNameWithType, String segmentName)
KKcorps marked this conversation as resolved.
Show resolved Hide resolved
throws IOException, URISyntaxException, HttpErrorStatusException {
String scheme = HTTP;
if (serverHostPort.contains(HTTPS)) {
scheme = HTTPS;
serverHostPort = serverHostPort.replace(HTTPS + "://", "");
} else if (serverHostPort.contains(HTTP)) {
KKcorps marked this conversation as resolved.
Show resolved Hide resolved
serverHostPort = serverHostPort.replace(HTTP + "://", "");
}

String serverHost = serverHostPort.split(":")[0];
String serverPort = serverHostPort.split(":")[1];

URI reIngestUri = getURI(scheme, serverHost, Integer.parseInt(serverPort), REINGEST_SEGMENT_PATH);

Map<String, Object> requestJson = new HashMap<>();
requestJson.put("tableNameWithType", tableNameWithType);
requestJson.put("segmentName", segmentName);

String jsonPayload = JsonUtils.objectToString(requestJson);
SimpleHttpResponse response =
HttpClient.wrapAndThrowHttpException(_httpClient.sendJsonPostRequest(reIngestUri, jsonPayload));

// Check that we got a 2xx response
int statusCode = response.getStatusCode();
if (statusCode / 100 != 2) {
throw new IOException(String.format("Failed POST to %s, HTTP %d: %s",
KKcorps marked this conversation as resolved.
Show resolved Hide resolved
reIngestUri, statusCode, response.getResponse()));
}
}

/**
* Generate a param list with a table name attribute.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.pinot.common.utils;

import java.util.Optional;
import javax.validation.constraints.NotNull;
import javax.annotation.Nullable;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;
Expand All @@ -39,8 +39,11 @@ private PauselessConsumptionUtils() {
* @return true if pauseless consumption is explicitly enabled, false otherwise
* @throws NullPointerException if tableConfig is null
*/
public static boolean isPauselessEnabled(@NotNull TableConfig tableConfig) {
return Optional.ofNullable(tableConfig.getIngestionConfig()).map(IngestionConfig::getStreamIngestionConfig)
.map(StreamIngestionConfig::isPauselessConsumptionEnabled).orElse(false);
public static boolean isPauselessEnabled(@Nullable TableConfig tableConfig) {
return Optional.ofNullable(tableConfig)
.map(TableConfig::getIngestionConfig)
.map(IngestionConfig::getStreamIngestionConfig)
.map(StreamIngestionConfig::isPauselessConsumptionEnabled)
.orElse(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,15 @@ public ZkHelixPropertyStore<ZNRecord> getPropertyStore() {
return _propertyStore;
}

/**
* Get the Pinot llc realtime segment manager
*
* @return Pinot llc realtime segment manager
*/
public PinotLLCRealtimeSegmentManager getPinotLLCRealtimeSegmentManager() {
return _pinotLLCRealtimeSegmentManager;
}

/**
* Get the linage manager.
*
Expand Down
Loading