Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pauseless Consumption #3: Disaster Recovery with Reingestion #14920

Open
wants to merge 68 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 51 commits
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
6defce3
Controller side changes to allow pauseless ingestion:
9aman Dec 19, 2024
54ab7b3
Minor improvements to improve readability
9aman Dec 20, 2024
1e40134
Server side changes to enable pauseless consumption. The changes include
9aman Dec 23, 2024
012da87
Fixes in the commit protocol for pauseless ingestion to complete the …
9aman Dec 24, 2024
a97847f
Changing the way to enable pausless post introduction of multi-stream…
9aman Dec 30, 2024
2c2ba86
WIP: Changes in the expected state while performing recovery during p…
9aman Dec 30, 2024
4d7c893
Adding changes for ensurePartitionsConsuming function that is utilize…
9aman Dec 30, 2024
a041a75
Add Server side Reingestion API
Dec 30, 2024
b6d0904
run segment level validation
Dec 30, 2024
58f6c51
Add method to trigger reingestion
Dec 30, 2024
d6313b3
Linting fixes
Dec 30, 2024
ca6134a
Adding integration tests for 3 failure scenarios that can occur durin…
9aman Jan 2, 2025
845f616
WIP: Reingestion test
Jan 6, 2025
d2dd313
Fix bug with indexes in reingestion
Jan 6, 2025
fb34fc8
Add tests for reingestion
Jan 8, 2025
50725bd
Fix controller url in Reingestion
Jan 14, 2025
e74d360
Support https and auth in reingestion
Jan 14, 2025
ce3c851
Merge branch 'master' into resolve-failures-pauseless-ingestion
9aman Jan 15, 2025
d6208a6
Removing checks on default crc and replacing them with segment status…
9aman Jan 15, 2025
7f5b720
Formatting improvements
9aman Jan 15, 2025
3f05b2f
Allowing null table config to be passed for checking pauseless consum…
9aman Jan 15, 2025
2012e38
Ensuring upload retries are backward compatible
9aman Jan 15, 2025
7b9da37
Removing unnecessary code
9aman Jan 15, 2025
ded8962
Fixing existing test cases and adding unit tests to check upload to d…
9aman Jan 16, 2025
e703d84
Refactor test cases to reduce repetition
9aman Jan 16, 2025
c2fda4a
Add missing header file
9aman Jan 16, 2025
c836009
Fix reingestion test
Jan 17, 2025
7974ab9
refactoring file upload download client
Jan 17, 2025
c08f841
Removing pauselessConsumptionEnabled from index config
9aman Jan 21, 2025
c4b99bd
Remove reingestion code
Jan 21, 2025
aee514c
Removing check in replaceSegmentIfCrcMismatch
9aman Jan 23, 2025
88a619a
Adding a new class for simple serialization and deserialization of ma…
9aman Jan 24, 2025
791ac21
Removing files related to reingestion tests
9aman Jan 24, 2025
8db5bae
Merging master and including force commit PR changes
9aman Jan 24, 2025
55b2b29
Revert "Remove reingestion code"
Jan 27, 2025
f74df66
Revert "Removing files related to reingestion tests"
Jan 27, 2025
1f4db11
Fix reingestion issue where consuming segemnts are not replaced
Jan 27, 2025
8e9249c
Copy full segment to deep store before triggerring metadata upload
Jan 27, 2025
b804a69
Refactoring: added support for tracking running reingestion jobs
Jan 27, 2025
609942d
Refactor: fix doc comments
Jan 27, 2025
f939714
Make SegmentZKMetadata JSON serializable
Jackie-Jiang Jan 28, 2025
155c49f
Minor API name change
Jackie-Jiang Jan 28, 2025
080ec55
Refactor PinotLLC class to add ability to inject failures
Jan 28, 2025
7e04fa3
Minor improvements
9aman Jan 28, 2025
523913f
Moving ZkMetadaptaUtils to commons and reusing the code in the upload…
9aman Jan 28, 2025
5689333
Fix lint failures
Jan 28, 2025
f42c6b8
Misc fix and cleanup
Jackie-Jiang Jan 29, 2025
a2eebf9
The tests were running slow due to the condition that the IdealState …
9aman Jan 29, 2025
1ba5b6c
Refactor Reingestion integration test
Jan 29, 2025
11aa170
Merge remote-tracking branch 'upstream/master' into pauseless-reinges…
Jan 29, 2025
e84788a
Fix error in tests post rebase
Jan 29, 2025
0d46327
refactoring
Jan 30, 2025
a94c7e3
Remove redundant code
Feb 3, 2025
8b9b8d1
Add support for queue in reingestion
Feb 3, 2025
d035249
Refactoring
Feb 4, 2025
bc8a65b
refaactoring
Feb 4, 2025
f082d24
Honour segment build semaphore during reingestion
Feb 4, 2025
d1ad30b
Fix test
Feb 5, 2025
7e47dd5
Add a seperate API to upload reingested segments
Feb 5, 2025
5a42c28
Cleanup code
Feb 5, 2025
1551685
Replace reIngest with reingest
Feb 6, 2025
837aa26
Replace reIngest with reingest
Feb 6, 2025
09e8583
Ensure correctness in reingestion
Feb 6, 2025
3d6fdf4
Decouple reingest completion and upload ZK path
Feb 6, 2025
b525c7e
Refactor Stateless segment writer to have least number of arguments
Feb 6, 2025
f7ae25f
Fix segment reset for pauseless tables
Feb 6, 2025
3c17957
Cleaning up Stateless segment writer
Feb 6, 2025
a3fa25c
Rename metric
Feb 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ public static FileUploadType getDefaultUploadType() {
private static final String FORCE_CLEANUP_PARAMETER = "&forceCleanup=";

private static final String RETENTION_PARAMETER = "retention=";
public static final String REINGEST_SEGMENT_PATH = "/reingestSegment";

private static final List<String> SUPPORTED_PROTOCOLS = Arrays.asList(HTTP, HTTPS);

Expand Down Expand Up @@ -1274,6 +1275,47 @@ public File downloadUntarFileStreamed(URI uri, File dest, AuthProvider authProvi
httpHeaders, maxStreamRateInByte);
}

/**
* Invokes the server's reIngestSegment API via a POST request with JSON payload,
* using Simple HTTP APIs.
*
* POST http://[serverURL]/reIngestSegment
KKcorps marked this conversation as resolved.
Show resolved Hide resolved
* {
* "tableNameWithType": [tableName],
* "segmentName": [segmentName]
* }
*/
public void triggerReIngestion(String serverHostPort, String tableNameWithType, String segmentName)
KKcorps marked this conversation as resolved.
Show resolved Hide resolved
throws IOException, URISyntaxException, HttpErrorStatusException {
String scheme = HTTP;
if (serverHostPort.contains(HTTPS)) {
scheme = HTTPS;
serverHostPort = serverHostPort.replace(HTTPS + "://", "");
} else if (serverHostPort.contains(HTTP)) {
KKcorps marked this conversation as resolved.
Show resolved Hide resolved
serverHostPort = serverHostPort.replace(HTTP + "://", "");
}

String serverHost = serverHostPort.split(":")[0];
String serverPort = serverHostPort.split(":")[1];

URI reIngestUri = getURI(scheme, serverHost, Integer.parseInt(serverPort), REINGEST_SEGMENT_PATH);

Map<String, Object> requestJson = new HashMap<>();
requestJson.put("tableNameWithType", tableNameWithType);
requestJson.put("segmentName", segmentName);

String jsonPayload = JsonUtils.objectToString(requestJson);
SimpleHttpResponse response =
HttpClient.wrapAndThrowHttpException(_httpClient.sendJsonPostRequest(reIngestUri, jsonPayload));

// Check that we got a 2xx response
int statusCode = response.getStatusCode();
if (statusCode / 100 != 2) {
throw new IOException(String.format("Failed POST to %s, HTTP %d: %s",
KKcorps marked this conversation as resolved.
Show resolved Hide resolved
reIngestUri, statusCode, response.getResponse()));
}
}

/**
* Generate a param list with a table name attribute.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.BiMap;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.io.IOException;
Expand Down Expand Up @@ -48,8 +49,11 @@
import org.apache.helix.ClusterMessagingService;
import org.apache.helix.Criteria;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.InstanceType;
import org.apache.helix.PropertyKey;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
Expand Down Expand Up @@ -442,6 +446,10 @@ public IdealState getIdealState(String realtimeTableName) {
}
}

public ExternalView getExternalView(String realtimeTableName) {
return _helixResourceManager.getTableExternalView(realtimeTableName);
}

KKcorps marked this conversation as resolved.
Show resolved Hide resolved
@VisibleForTesting
void setIdealState(String realtimeTableName, IdealState idealState) {
try {
Expand Down Expand Up @@ -2096,6 +2104,137 @@ URI createSegmentPath(String rawTableName, String segmentName) {
return URIUtils.getUri(_controllerConf.getDataDir(), rawTableName, URIUtils.encode(segmentName));
}

/**
* Re-ingests segments that are in ERROR state in EV but ONLINE in IS with no peer copy on any server. This method
* will call the server reIngestSegment API
* on one of the alive servers that are supposed to host that segment according to IdealState.
*
* API signature:
* POST http://[serverURL]/reIngestSegment
* Request body (JSON):
* {
* "tableNameWithType": [tableName],
* "segmentName": [segmentName]
* }
*
* @param tableNameWithType The table name with type, e.g. "myTable_REALTIME"
*/
public void reIngestSegmentsWithErrorState(String tableNameWithType) {
KKcorps marked this conversation as resolved.
Show resolved Hide resolved
// Step 1: Fetch the ExternalView and all segments
ExternalView externalView = getExternalView(tableNameWithType);
KKcorps marked this conversation as resolved.
Show resolved Hide resolved
IdealState idealState = getIdealState(tableNameWithType);
Map<String, Map<String, String>> segmentToInstanceCurrentStateMap = externalView.getRecord().getMapFields();
Map<String, Map<String, String>> segmentToInstanceIdealStateMap = idealState.getRecord().getMapFields();

// find segments in ERROR state in externalView
List<String> segmentsInErrorState = new ArrayList<>();
for (Map.Entry<String, Map<String, String>> entry : segmentToInstanceCurrentStateMap.entrySet()) {
String segmentName = entry.getKey();
Map<String, String> instanceStateMap = entry.getValue();
boolean allReplicasInError = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When there are ONLINE replica, ideally we should reset the ERROR replica. Do we rely on validation manager for that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When there are ONLINE replica, ideally we should reset the ERROR replica. Do we rely on validation manager for that? yes that is already a part of validation manager https://github.com/apache/pinot/pull/14217/files

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we pull that logic out and handle it differently for pauseless table? For pauseless table, reset doesn't really work. Also that shouldn't be segment level validation.

for (String state : instanceStateMap.values()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to wait for the segment to be in ERROR state for all the servers.

What would happen in this case:

  1. Server 1 goes in ERROR state.
  2. Server 2 is able to build it eventually.

Server 1 stays in ERROR state as it's not reset.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That can happen even in non pauseless scenario right? It should already be handled. Ideally we need to simply reset the segment in this scenario without reingestion

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This already happens in segment level validation

https://github.com/apache/pinot/pull/14217/files

    if (_segmentAutoResetOnErrorAtValidation) {
      _pinotHelixResourceManager.resetSegments(realtimeTableName, null, true);
    }

if (!SegmentStateModel.ERROR.equals(state)) {
allReplicasInError = false;
break;
}
}
if (allReplicasInError) {
segmentsInErrorState.add(segmentName);
}
}

if (segmentsInErrorState.isEmpty()) {
LOGGER.info("No segments found in ERROR state for table {}", tableNameWithType);
return;
}

// filter out segments that are not ONLINE in IdealState
for (String segmentName : segmentsInErrorState) {
KKcorps marked this conversation as resolved.
Show resolved Hide resolved
Map<String, String> instanceIdealStateMap = segmentToInstanceIdealStateMap.get(segmentName);
boolean isOnline = true;
for (String state : instanceIdealStateMap.values()) {
if (!SegmentStateModel.ONLINE.equals(state)) {
isOnline = false;
break;
}
}
if (!isOnline) {
segmentsInErrorState.remove(segmentName);
KKcorps marked this conversation as resolved.
Show resolved Hide resolved
}
}

// Step 2: For each segment, check the ZK metadata for conditions
for (String segmentName : segmentsInErrorState) {
// Skip non-LLC segments or segments missing from the ideal state altogether
LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName);
KKcorps marked this conversation as resolved.
Show resolved Hide resolved
if (llcSegmentName == null || !segmentToInstanceCurrentStateMap.containsKey(segmentName)) {
KKcorps marked this conversation as resolved.
Show resolved Hide resolved
continue;
}

SegmentZKMetadata segmentZKMetadata = getSegmentZKMetadata(tableNameWithType, segmentName);
// We only consider segments that are in COMMITTING which is indicated by having an endOffset
// but have a missing or placeholder download URL
KKcorps marked this conversation as resolved.
Show resolved Hide resolved
if (segmentZKMetadata.getStatus() == Status.COMMITTING) {
Map<String, String> instanceStateMap = segmentToInstanceIdealStateMap.get(segmentName);

// Step 3: “No peer has that segment.” => Re-ingest from one server that is supposed to host it and is alive
LOGGER.info(
"Segment {} in table {} is COMMITTING with missing download URL and no peer copy. Triggering re-ingestion.",
segmentName, tableNameWithType);

// Find at least one server that should host this segment and is alive
String aliveServer = findAliveServerToReIngest(instanceStateMap.keySet());
KKcorps marked this conversation as resolved.
Show resolved Hide resolved
if (aliveServer == null) {
LOGGER.warn("No alive server found to re-ingest segment {} in table {}", segmentName, tableNameWithType);
continue;
}

try {
_fileUploadDownloadClient.triggerReIngestion(aliveServer, tableNameWithType, segmentName);
KKcorps marked this conversation as resolved.
Show resolved Hide resolved
LOGGER.info("Successfully triggered reIngestion for segment {} on server {}", segmentName, aliveServer);
} catch (Exception e) {
LOGGER.error("Failed to call reIngestSegment for segment {} on server {}", segmentName, aliveServer, e);
}
} else if (segmentZKMetadata.getStatus() == Status.UPLOADED) {
LOGGER.info(
"Segment {} in table {} is in ERROR state with download URL present. Resetting segment to ONLINE state.",
segmentName, tableNameWithType);
_helixResourceManager.resetSegment(tableNameWithType, segmentName, null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reset segment does not work when the SegmentDataManager is missing on the server. Consider the following scenario:
A segment has missing url. The server hosting these segments restart and the segment goes in ERROR state in EV.
The re-ingestion updates the ZKMetadata and the reset segment message is sent.
The server does not have any SegmentDataManager instance for the segment and hence the reset does not work.

  protected void doReplaceSegment(String segmentName)
      throws Exception {
    SegmentDataManager segmentDataManager = _segmentDataManagerMap.get(segmentName);
    if (segmentDataManager != null) {
      SegmentZKMetadata zkMetadata = fetchZKMetadata(segmentName);
      IndexLoadingConfig indexLoadingConfig = fetchIndexLoadingConfig();
      indexLoadingConfig.setSegmentTier(zkMetadata.getTier());
      replaceSegmentIfCrcMismatch(segmentDataManager, zkMetadata, indexLoadingConfig);
    } else {
      _logger.warn("Failed to find segment: {}, skipping replacing it", segmentName);
    }
  }


A ran the above code and found the following error.
[upsertMeetupRsvp_with_dr_2_REALTIME-RealtimeTableDataManager] [HelixTaskExecutor-message_handle_thread_40] Failed to find segment: upsertMeetupRsvp_with_dr_2__0__57__20250127T0745Z, skipping replacing it

KKcorps marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

/**
* Picks one 'alive' server among a set of servers that are supposed to host the segment,
* e.g. by checking if Helix says it is enabled or if it appears in the live instance list.
* This is a simple example; adapt to your environment’s definition of “alive.”
*/
private String findAliveServerToReIngest(Set<String> candidateServers) {
// Get the current live instances from Helix
HelixDataAccessor helixDataAccessor = _helixManager.getHelixDataAccessor();
PropertyKey.Builder keyBuilder = helixDataAccessor.keyBuilder();
List<String> liveInstances = helixDataAccessor.getChildNames(keyBuilder.liveInstances());
try {
// This should ideally handle https scheme as well
BiMap<String, String> instanceToEndpointMap =
_helixResourceManager.getDataInstanceAdminEndpoints(candidateServers);

if (instanceToEndpointMap.isEmpty()) {
LOGGER.warn("No instance data admin endpoints found for servers: {}", candidateServers);
return null;
}

for (String server : candidateServers) {
if (liveInstances.contains(server)) {
KKcorps marked this conversation as resolved.
Show resolved Hide resolved
return instanceToEndpointMap.get(server);
KKcorps marked this conversation as resolved.
Show resolved Hide resolved
}
}
} catch (Exception e) {
LOGGER.warn("Failed to get Helix instance data admin endpoints for servers: {}", candidateServers, e);
}
return null;
}

public Set<String> getSegmentsYetToBeCommitted(String tableNameWithType, Set<String> segmentsToCheck) {
Set<String> segmentsYetToBeCommitted = new HashSet<>();
for (String segmentName: segmentsToCheck) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.metrics.ValidationMetrics;
import org.apache.pinot.common.utils.PauselessConsumptionUtils;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.api.resources.PauseStatusDetails;
Expand Down Expand Up @@ -176,6 +177,12 @@ private void runSegmentLevelValidation(TableConfig tableConfig) {
// Update the total document count gauge
_validationMetrics.updateTotalDocumentCountGauge(realtimeTableName, computeTotalDocumentCount(segmentsZKMetadata));

boolean isPauselessConsumptionEnabled = PauselessConsumptionUtils.isPauselessEnabled(tableConfig);

if (isPauselessConsumptionEnabled) {
KKcorps marked this conversation as resolved.
Show resolved Hide resolved
_llcRealtimeSegmentManager.reIngestSegmentsWithErrorState(tableConfig.getTableName());
}

// Check missing segments and upload them to the deep store
if (_llcRealtimeSegmentManager.isDeepStoreLLCSegmentUploadRetryEnabled()) {
_llcRealtimeSegmentManager.uploadToDeepStoreIfMissing(tableConfig, segmentsZKMetadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
import org.apache.pinot.common.utils.PauselessConsumptionUtils;
import org.apache.pinot.common.utils.TarCompressionUtils;
import org.apache.pinot.common.utils.config.TierConfigUtils;
import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
Expand Down Expand Up @@ -389,15 +390,21 @@ protected void replaceSegmentIfCrcMismatch(SegmentDataManager segmentDataManager
IndexLoadingConfig indexLoadingConfig)
throws Exception {
String segmentName = segmentDataManager.getSegmentName();
Preconditions.checkState(segmentDataManager instanceof ImmutableSegmentDataManager,
"Cannot replace CONSUMING segment: %s in table: %s", segmentName, _tableNameWithType);
SegmentMetadata localMetadata = segmentDataManager.getSegment().getSegmentMetadata();
if (hasSameCRC(zkMetadata, localMetadata)) {
_logger.info("Segment: {} has CRC: {} same as before, not replacing it", segmentName, localMetadata.getCrc());
return;
TableConfig tableConfig = indexLoadingConfig.getTableConfig();
// For pauseless tables, we should replace the segment if download url is missing even if crc is same
// Without this the reingestion of ERROR segments in pauseless tables fails
// as the segment data manager is still an instance of RealtimeSegmentDataManager
if (!PauselessConsumptionUtils.isPauselessEnabled(tableConfig)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not able to come up with a better approach to achieve this but this does not seem the right way to cater to re-ingestion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@noob-se7en Can you please review this part once. Feel free to post any questions regarding the context.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check was added in the following PR - https://github.com/apache/pinot/pull/12886/files

What problems do you see with this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When will crc match? We shouldn't replace a segment multiple times (e.g. somehow 2 servers trying to re-ingest)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I am also not happy with this. The thing I am trying to solve for is basically making segment refresh succeed for reingesion

Without this check it fails on the following line:

   Preconditions.checkState(segmentDataManager instanceof ImmutableSegmentDataManager,
          "Cannot replace CONSUMING segment: %s in table: %s", segmentName, _tableNameWithType);

Segment refresh is triggered whenever a segment is uploaded which is what reingestion is doing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably need a new API to handle re-ingested segment. It is not regular segment push. With a new API we can also set the status to DONE, and then trigger the reset from the API.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, added a new API called /segment/completeReingestion that handles this.

Preconditions.checkState(segmentDataManager instanceof ImmutableSegmentDataManager,
"Cannot replace CONSUMING segment: %s in table: %s", segmentName, _tableNameWithType);
SegmentMetadata localMetadata = segmentDataManager.getSegment().getSegmentMetadata();
if (hasSameCRC(zkMetadata, localMetadata)) {
_logger.info("Segment: {} has CRC: {} same as before, not replacing it", segmentName, localMetadata.getCrc());
return;
}
_logger.info("Replacing segment: {} because its CRC has changed from: {} to: {}", segmentName,
localMetadata.getCrc(), zkMetadata.getCrc());
}
_logger.info("Replacing segment: {} because its CRC has changed from: {} to: {}", segmentName,
localMetadata.getCrc(), zkMetadata.getCrc());
downloadAndLoadSegment(zkMetadata, indexLoadingConfig);
_logger.info("Replaced segment: {} with new CRC: {}", segmentName, zkMetadata.getCrc());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1056,7 +1056,7 @@ AtomicBoolean getAcquiredConsumerSemaphore() {
}

@VisibleForTesting
SegmentBuildDescriptor buildSegmentInternal(boolean forCommit) {
protected SegmentBuildDescriptor buildSegmentInternal(boolean forCommit) {
// for partial upsert tables, do not release _partitionGroupConsumerSemaphore proactively and rely on offload()
// to release the semaphore. This ensures new consuming segment is not consuming until the segment replacement is
// complete.
Expand Down Expand Up @@ -1243,7 +1243,7 @@ protected boolean buildSegmentAndReplace()
return true;
}

private void closeStreamConsumers() {
protected void closeStreamConsumers() {
KKcorps marked this conversation as resolved.
Show resolved Hide resolved
closePartitionGroupConsumer();
closePartitionMetadataProvider();
if (_acquiredConsumerSemaphore.compareAndSet(true, false)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -549,9 +549,8 @@ private void doAddConsumingSegment(String segmentName)
_tableDedupMetadataManager != null ? _tableDedupMetadataManager.getOrCreatePartitionManager(partitionGroupId)
: null;
RealtimeSegmentDataManager realtimeSegmentDataManager =
new RealtimeSegmentDataManager(zkMetadata, tableConfig, this, _indexDir.getAbsolutePath(), indexLoadingConfig,
schema, llcSegmentName, semaphore, _serverMetrics, partitionUpsertMetadataManager,
partitionDedupMetadataManager, _isTableReadyToConsumeData);
createRealtimeSegmentDataManager(zkMetadata, tableConfig, indexLoadingConfig, schema, llcSegmentName, semaphore,
partitionUpsertMetadataManager, partitionDedupMetadataManager, _isTableReadyToConsumeData);
registerSegment(segmentName, realtimeSegmentDataManager, partitionUpsertMetadataManager);
if (partitionUpsertMetadataManager != null) {
partitionUpsertMetadataManager.trackNewlyAddedSegment(segmentName);
Expand Down Expand Up @@ -638,6 +637,16 @@ private long getDownloadTimeOutMilliseconds(@Nullable TableConfig tableConfig) {
}).orElse(DEFAULT_SEGMENT_DOWNLOAD_TIMEOUT_MS);
}

protected RealtimeSegmentDataManager createRealtimeSegmentDataManager(SegmentZKMetadata zkMetadata,
KKcorps marked this conversation as resolved.
Show resolved Hide resolved
TableConfig tableConfig, IndexLoadingConfig indexLoadingConfig, Schema schema, LLCSegmentName llcSegmentName,
Semaphore semaphore, PartitionUpsertMetadataManager partitionUpsertMetadataManager,
PartitionDedupMetadataManager partitionDedupMetadataManager, BooleanSupplier isTableReadyToConsumeData)
throws AttemptsExceededException, RetriableOperationException {
return new RealtimeSegmentDataManager(zkMetadata, tableConfig, this, _indexDir.getAbsolutePath(),
indexLoadingConfig, schema, llcSegmentName, semaphore, _serverMetrics, partitionUpsertMetadataManager,
partitionDedupMetadataManager, isTableReadyToConsumeData);
}

/**
* Sets the default time value in the schema as the segment creation time if it is invalid. Time column is used to
* manage the segments, so its values have to be within the valid range.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,14 @@ public AuthProvider getAuthProvider() {
return _authProvider;
}

public String getProtocol() {
return _protocol;
}

public Integer getControllerHttpsPort() {
return _controllerHttpsPort;
}

KKcorps marked this conversation as resolved.
Show resolved Hide resolved
public SegmentCompletionProtocol.Response segmentCommitStart(SegmentCompletionProtocol.Request.Params params) {
SegmentCompletionProtocol.SegmentCommitStartRequest request =
new SegmentCompletionProtocol.SegmentCommitStartRequest(params);
Expand Down
Loading
Loading