diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java index 99bd89206607..415acc0596b6 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java @@ -180,7 +180,10 @@ public enum ControllerGauge implements AbstractMetrics.Gauge { // segment when the partition is first detected). COMMITTING_SEGMENT_SIZE("committingSegmentSize", false), - TABLE_REBALANCE_IN_PROGRESS("tableRebalanceInProgress", false); + TABLE_REBALANCE_IN_PROGRESS("tableRebalanceInProgress", false), + + // Number of reingested segments getting uploaded + SEGMENT_REINGESTION_UPLOAD_IN_PROGRESS("segmentReingestionUploadInProgress", true); private final String _gaugeName; private final String _unit; diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java index 6936e6dcb662..6c9787222aee 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java @@ -127,6 +127,7 @@ public static FileUploadType getDefaultUploadType() { private static final String FORCE_CLEANUP_PARAMETER = "&forceCleanup="; private static final String RETENTION_PARAMETER = "retention="; + public static final String SEGMENT_REINGEST_COMPLETION_PATH = "/segments/reingested"; private static final List SUPPORTED_PROTOCOLS = Arrays.asList(HTTP, HTTPS); @@ -369,6 +370,12 @@ public static URI getUploadSegmentURI(URI controllerURI) return getURI(controllerURI.getScheme(), controllerURI.getHost(), controllerURI.getPort(), SEGMENT_PATH); } + public static URI getSegmentReingestCompletionURI(URI controllerURI) + throws URISyntaxException { + return getURI(controllerURI.getScheme(), controllerURI.getHost(), controllerURI.getPort(), + SEGMENT_REINGEST_COMPLETION_PATH); + } + public static URI getBatchSegmentUploadURI(URI controllerURI) throws URISyntaxException { return getURI(controllerURI.getScheme(), controllerURI.getHost(), controllerURI.getPort(), diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java index cf8c2bf956f3..4b4cf49c3e0e 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java @@ -233,6 +233,13 @@ public Response downloadSegment( private SuccessResponse uploadSegment(@Nullable String tableName, TableType tableType, @Nullable FormDataMultiPart multiPart, boolean copySegmentToFinalLocation, boolean enableParallelPushProtection, boolean allowRefresh, HttpHeaders headers, Request request) { + return uploadSegment(tableName, tableType, multiPart, copySegmentToFinalLocation, enableParallelPushProtection, + allowRefresh, false, headers, request); + } + + private SuccessResponse uploadSegment(@Nullable String tableName, TableType tableType, + @Nullable FormDataMultiPart multiPart, boolean copySegmentToFinalLocation, boolean enableParallelPushProtection, + boolean allowRefresh, boolean isReingested, HttpHeaders headers, Request request) { if (StringUtils.isNotEmpty(tableName)) { TableType tableTypeFromTableName = TableNameBuilder.getTableTypeFromTableName(tableName); if (tableTypeFromTableName != null && tableTypeFromTableName != tableType) { @@ -409,9 +416,16 @@ private SuccessResponse uploadSegment(@Nullable String tableName, TableType tabl segmentDownloadURIStr, segmentFile, tableNameWithType, copySegmentToFinalLocation); ZKOperator zkOperator = new ZKOperator(_pinotHelixResourceManager, _controllerConf, _controllerMetrics); - zkOperator.completeSegmentOperations(tableNameWithType, segmentMetadata, uploadType, finalSegmentLocationURI, - segmentFile, sourceDownloadURIStr, segmentDownloadURIStr, crypterName, segmentSizeInBytes, - enableParallelPushProtection, allowRefresh, headers); + + if (!isReingested) { + zkOperator.completeSegmentOperations(tableNameWithType, segmentMetadata, uploadType, finalSegmentLocationURI, + segmentFile, sourceDownloadURIStr, segmentDownloadURIStr, crypterName, segmentSizeInBytes, + enableParallelPushProtection, allowRefresh, headers); + } else { + zkOperator.completeReingestedSegmentOperations(tableNameWithType, segmentMetadata, finalSegmentLocationURI, + sourceDownloadURIStr, segmentDownloadURIStr, crypterName, segmentSizeInBytes, enableParallelPushProtection, + headers); + } return new SuccessResponse("Successfully uploaded segment: " + segmentName + " of table: " + tableNameWithType); } catch (WebApplicationException e) { @@ -965,6 +979,41 @@ public Response revertReplaceSegments( } } + @POST + @ManagedAsync + @Produces(MediaType.APPLICATION_JSON) + @Consumes(MediaType.MULTIPART_FORM_DATA) + @Path("segments/reingested") + @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = Actions.Table.UPLOAD_SEGMENT) + @Authenticate(AccessType.CREATE) + @ApiOperation(value = "Reingest a realtime segment", notes = "Reingest a segment as multipart file") + @ApiResponses(value = { + @ApiResponse(code = 200, message = "Successfully reingested segment"), + @ApiResponse(code = 400, message = "Bad Request"), + @ApiResponse(code = 403, message = "Segment validation fails"), + @ApiResponse(code = 409, message = "Segment already exists or another parallel push in progress"), + @ApiResponse(code = 412, message = "CRC check fails"), + @ApiResponse(code = 500, message = "Internal error") + }) + @TrackInflightRequestMetrics + @TrackedByGauge(gauge = ControllerGauge.SEGMENT_REINGESTION_UPLOAD_IN_PROGRESS) + public void completeSegmentReingestion(FormDataMultiPart multiPart, + @ApiParam(value = "Name of the table") @QueryParam(FileUploadDownloadClient.QueryParameters.TABLE_NAME) + String tableName, + @ApiParam(value = "Type of the table") @QueryParam(FileUploadDownloadClient.QueryParameters.TABLE_TYPE) + @DefaultValue("REALTIME") String tableType, + @ApiParam(value = "Whether to enable parallel push protection") @DefaultValue("false") + @QueryParam(FileUploadDownloadClient.QueryParameters.ENABLE_PARALLEL_PUSH_PROTECTION) + boolean enableParallelPushProtection, + @Context HttpHeaders headers, @Context Request request, @Suspended final AsyncResponse asyncResponse) { + try { + asyncResponse.resume(uploadSegment(tableName, TableType.valueOf(tableType.toUpperCase()), multiPart, + true, enableParallelPushProtection, false, true, headers, request)); + } catch (Throwable t) { + asyncResponse.resume(t); + } + } + private static void createSegmentFileFromMultipart(FormDataMultiPart multiPart, File destFile) throws IOException { // Read segment file or segment metadata file and directly use that information to update zk diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java index 4da9c72a5665..2b6e8717a8a0 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java @@ -100,18 +100,20 @@ public void completeSegmentOperations(String tableNameWithType, SegmentMetadata headers); } else { // Refresh an existing segment - if (!allowRefresh) { + + if (!(allowRefresh)) { // We cannot perform this check up-front in UploadSegment API call. If a segment doesn't exist during the check // done up-front but ends up getting created before the check here, we could incorrectly refresh an existing // segment. throw new ControllerApplicationException(LOGGER, - String.format("Segment: %s already exists in table: %s. Refresh not permitted.", segmentName, - tableNameWithType), Response.Status.CONFLICT); + String.format("Segment: %s already exists in table: %s. Neither refresh nor reset not permitted.", + segmentName, tableNameWithType), Response.Status.CONFLICT); } + LOGGER.info("Segment: {} already exists in table: {}, refreshing it", segmentName, tableNameWithType); - processExistingSegment(tableNameWithType, segmentMetadata, uploadType, existingSegmentMetadataZNRecord, - finalSegmentLocationURI, segmentFile, sourceDownloadURIStr, segmentDownloadURIStr, crypterName, - segmentSizeInBytes, enableParallelPushProtection, headers); + processExistingSegmentWithRefresh(tableNameWithType, segmentMetadata, uploadType, + existingSegmentMetadataZNRecord, finalSegmentLocationURI, segmentFile, sourceDownloadURIStr, + segmentDownloadURIStr, crypterName, segmentSizeInBytes, enableParallelPushProtection, headers); } } @@ -170,6 +172,29 @@ public void completeSegmentsOperations(String tableNameWithType, FileUploadType processExistingSegments(tableNameWithType, uploadType, enableParallelPushProtection, headers, existingSegmentsList); } + public void completeReingestedSegmentOperations(String tableNameWithType, SegmentMetadata segmentMetadata, + @Nullable URI finalSegmentLocationURI, @Nullable String sourceDownloadURIStr, String segmentDownloadURIStr, + @Nullable String crypterName, long segmentSizeInBytes, boolean enableParallelPushProtection, HttpHeaders headers) + throws Exception { + String segmentName = segmentMetadata.getName(); + + ZNRecord existingSegmentMetadataZNRecord = + _pinotHelixResourceManager.getSegmentMetadataZnRecord(tableNameWithType, segmentName); + if (existingSegmentMetadataZNRecord != null && shouldProcessAsNewSegment(tableNameWithType, segmentName, + existingSegmentMetadataZNRecord, enableParallelPushProtection)) { + LOGGER.warn("Removing segment ZK metadata (recovering from previous upload failure) for table: {}, segment: {}", + tableNameWithType, segmentName); + Preconditions.checkState(_pinotHelixResourceManager.removeSegmentZKMetadata(tableNameWithType, segmentName), + "Failed to remove segment ZK metadata for table: %s, segment: %s", tableNameWithType, segmentName); + existingSegmentMetadataZNRecord = null; + } + + LOGGER.info("Segment: {} already exists in table: {}, resetting it", segmentName, tableNameWithType); + processReingestedSegment(tableNameWithType, segmentMetadata, existingSegmentMetadataZNRecord, + finalSegmentLocationURI, sourceDownloadURIStr, segmentDownloadURIStr, crypterName, segmentSizeInBytes, + enableParallelPushProtection, headers); + } + /** * Returns {@code true} when the segment should be processed as new segment. *

When segment ZK metadata exists, check if segment exists in the ideal state. If the previous upload failed after @@ -208,7 +233,7 @@ private void handleParallelPush(String tableNameWithType, String segmentName, lo } } - private void processExistingSegment(String tableNameWithType, SegmentMetadata segmentMetadata, + private void processExistingSegmentWithRefresh(String tableNameWithType, SegmentMetadata segmentMetadata, FileUploadType uploadType, ZNRecord existingSegmentMetadataZNRecord, @Nullable URI finalSegmentLocationURI, File segmentFile, @Nullable String sourceDownloadURIStr, String segmentDownloadURIStr, @Nullable String crypterName, long segmentSizeInBytes, boolean enableParallelPushProtection, HttpHeaders headers) @@ -474,6 +499,76 @@ private void processExistingSegments(String tableNameWithType, FileUploadType up } } + private void processReingestedSegment(String tableNameWithType, SegmentMetadata segmentMetadata, + ZNRecord existingSegmentMetadataZNRecord, @Nullable URI finalSegmentLocationURI, + @Nullable String sourceDownloadURIStr, String segmentDownloadURIStr, @Nullable String crypterName, + long segmentSizeInBytes, boolean enableParallelPushProtection, HttpHeaders headers) + throws Exception { + String segmentName = segmentMetadata.getName(); + int expectedVersion = existingSegmentMetadataZNRecord.getVersion(); + + // Check if CRC match when IF-MATCH header is set + SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(existingSegmentMetadataZNRecord); + long existingCrc = segmentZKMetadata.getCrc(); + checkCRC(headers, tableNameWithType, segmentName, existingCrc); + + // Check segment upload start time when parallel push protection enabled + if (enableParallelPushProtection) { + // When segment upload start time is larger than 0, that means another upload is in progress + long segmentUploadStartTime = segmentZKMetadata.getSegmentUploadStartTime(); + if (segmentUploadStartTime > 0) { + handleParallelPush(tableNameWithType, segmentName, segmentUploadStartTime); + } + + // Lock the segment by setting the upload start time in ZK + segmentZKMetadata.setSegmentUploadStartTime(System.currentTimeMillis()); + if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType, segmentZKMetadata, expectedVersion)) { + throw new ControllerApplicationException(LOGGER, + String.format("Failed to lock the segment: %s of table: %s, retry later", segmentName, tableNameWithType), + Response.Status.CONFLICT); + } else { + // The version will increment if the zk metadata update is successful + expectedVersion++; + } + } + + // Reset segment upload start time to unlock the segment later + // NOTE: reset this value even if parallel push protection is not enabled so that segment can recover in case + // previous segment upload did not finish properly and the parallel push protection is turned off + segmentZKMetadata.setSegmentUploadStartTime(-1); + + try { + // Update ZK metadata + if (finalSegmentLocationURI != null) { + copyFromSegmentURIToDeepStore(new URI(sourceDownloadURIStr), finalSegmentLocationURI); + LOGGER.info("Copied segment: {} of table: {} to final location: {}", segmentName, tableNameWithType, + finalSegmentLocationURI); + } + + // If no modifier is provided, use the custom map from the segment metadata + segmentZKMetadata.setCustomMap(segmentMetadata.getCustomMap()); + + // using committing segment zk metadata so that Status is shown as DONE instead of UPLOADED + SegmentZKMetadataUtils.updateCommittingSegmentZKMetadata(tableNameWithType, segmentZKMetadata, segmentMetadata, + segmentDownloadURIStr, segmentSizeInBytes, segmentZKMetadata.getEndOffset()); + if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType, segmentZKMetadata, expectedVersion)) { + throw new RuntimeException( + String.format("Failed to update ZK metadata for segment: %s, table: %s, expected version: %d", + segmentName, tableNameWithType, expectedVersion)); + } + LOGGER.info("Updated segment: {} of table: {} to property store", segmentName, tableNameWithType); + + // Send a message to servers hosting the table to reset the segment + _pinotHelixResourceManager.resetSegment(tableNameWithType, segmentName, null); + } catch (Exception e) { + if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType, segmentZKMetadata, expectedVersion)) { + LOGGER.error("Failed to update ZK metadata for segment: {}, table: {}, expected version: {}", segmentName, + tableNameWithType, expectedVersion); + } + throw e; + } + } + private void checkCRC(HttpHeaders headers, String tableNameWithType, String segmentName, long existingCrc) { String expectedCrcStr = headers.getHeaderString(HttpHeaders.IF_MATCH); if (expectedCrcStr != null) { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 8f5382ae5ed2..9f91dd9b08ec 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -21,10 +21,12 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.collect.BiMap; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import java.io.IOException; import java.net.URI; +import java.net.URISyntaxException; import java.sql.Timestamp; import java.util.ArrayList; import java.util.Arrays; @@ -50,11 +52,13 @@ import org.apache.helix.HelixAdmin; import org.apache.helix.HelixManager; import org.apache.helix.InstanceType; +import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.pinot.common.assignment.InstancePartitions; import org.apache.pinot.common.assignment.InstancePartitionsUtils; +import org.apache.pinot.common.exception.HttpErrorStatusException; import org.apache.pinot.common.messages.ForceCommitMessage; import org.apache.pinot.common.messages.IngestionMetricsRemoveMessage; import org.apache.pinot.common.metadata.ZKMetadataProvider; @@ -71,6 +75,7 @@ import org.apache.pinot.common.utils.PauselessConsumptionUtils; import org.apache.pinot.common.utils.URIUtils; import org.apache.pinot.common.utils.helix.HelixHelper; +import org.apache.pinot.common.utils.http.HttpClient; import org.apache.pinot.controller.ControllerConf; import org.apache.pinot.controller.api.events.MetadataEventNotifierFactory; import org.apache.pinot.controller.api.resources.Constants; @@ -168,6 +173,8 @@ public class PinotLLCRealtimeSegmentManager { * deep store fix if necessary. RetentionManager will delete this kind of segments shortly anyway. */ private static final long MIN_TIME_BEFORE_SEGMENT_EXPIRATION_FOR_FIXING_DEEP_STORE_COPY_MILLIS = 60 * 60 * 1000L; + private static final String REINGEST_SEGMENT_PATH = "/reingestSegment"; + // 1 hour private static final Random RANDOM = new Random(); @@ -2096,6 +2103,156 @@ URI createSegmentPath(String rawTableName, String segmentName) { return URIUtils.getUri(_controllerConf.getDataDir(), rawTableName, URIUtils.encode(segmentName)); } + /** + * Re-ingests segments that are in ERROR state in EV but ONLINE in IS with no peer copy on any server. This method + * will call the server reingestSegment API + * on one of the alive servers that are supposed to host that segment according to IdealState. + * + * API signature: + * POST http://[serverURL]/reingestSegment/[segmentName] + * Request body (JSON): + * + * If segment is in ERROR state in only few replicas but has download URL, we instead trigger a segment reset + * @param realtimeTableName The table name with type, e.g. "myTable_REALTIME" + */ + public void repairSegmentsInErrorState(String realtimeTableName) { + // Step 1: Fetch the ExternalView and all segments + ExternalView externalView = _helixResourceManager.getTableExternalView(realtimeTableName); + if (externalView == null) { + LOGGER.warn("External view not found for table {}", realtimeTableName); + return; + } + IdealState idealState = getIdealState(realtimeTableName); + Map> segmentToInstanceCurrentStateMap = externalView.getRecord().getMapFields(); + Map> segmentToInstanceIdealStateMap = idealState.getRecord().getMapFields(); + + // find segments in ERROR state in externalView + List segmentsInErrorStateInAllReplicas = new ArrayList<>(); + List segmentsInErrorStateInAtleastOneReplica = new ArrayList<>(); + for (Map.Entry> entry : segmentToInstanceCurrentStateMap.entrySet()) { + String segmentName = entry.getKey(); + + // Skip non-LLC segments or segments missing from the ideal state altogether + LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName); + if (llcSegmentName == null) { + continue; + } + + Map instanceStateMap = entry.getValue(); + int numReplicasInError = 0; + for (String state : instanceStateMap.values()) { + if (SegmentStateModel.ERROR.equals(state)) { + numReplicasInError++; + } + } + + if (numReplicasInError > 0) { + segmentsInErrorStateInAtleastOneReplica.add(segmentName); + } + + if (numReplicasInError == instanceStateMap.size()) { + segmentsInErrorStateInAllReplicas.add(segmentName); + } + } + + if (segmentsInErrorStateInAtleastOneReplica.isEmpty()) { + LOGGER.info("No segments found in ERROR state for table {}", realtimeTableName); + return; + } + + // filter out segments that are not ONLINE in IdealState + for (String segmentName : segmentsInErrorStateInAtleastOneReplica) { + Map instanceIdealStateMap = segmentToInstanceIdealStateMap.get(segmentName); + boolean isOnline = true; + for (String state : instanceIdealStateMap.values()) { + if (!SegmentStateModel.ONLINE.equals(state)) { + isOnline = false; + break; + } + } + if (!isOnline) { + continue; + } + + SegmentZKMetadata segmentZKMetadata = getSegmentZKMetadata(realtimeTableName, segmentName); + // We only consider segments that are in COMMITTING state for reingestion + if (segmentsInErrorStateInAllReplicas.contains(segmentName) + && segmentZKMetadata.getStatus() == Status.COMMITTING) { + Map instanceStateMap = segmentToInstanceIdealStateMap.get(segmentName); + + // Step 3: “No peer has that segment.” => Re-ingest from one server that is supposed to host it and is alive + LOGGER.info( + "Segment {} in table {} is COMMITTING with missing download URL and no peer copy. Triggering re-ingestion.", + segmentName, realtimeTableName); + + // Find at least one server that should host this segment and is alive + String aliveServer = pickServerToReingest(instanceStateMap.keySet()); + if (aliveServer == null) { + LOGGER.warn("No alive server found to re-ingest segment {} in table {}", segmentName, realtimeTableName); + continue; + } + + try { + triggerReingestion(aliveServer, segmentName); + LOGGER.info("Successfully triggered reingestion for segment {} on server {}", segmentName, aliveServer); + } catch (Exception e) { + LOGGER.error("Failed to call reingestSegment for segment {} on server {}", segmentName, aliveServer, e); + } + } else if (segmentZKMetadata.getStatus() != Status.IN_PROGRESS) { + // trigger segment reset for this since it is not in IN_PROGRESS state so download URL must be present + _helixResourceManager.resetSegment(realtimeTableName, segmentName, null); + } + } + } + + /** + * Invokes the server's reingestSegment API via a POST request with JSON payload, + * using Simple HTTP APIs. + * + * POST http://[serverURL]/reingestSegment/[segmentName] + */ + private void triggerReingestion(String serverHostPort, String segmentName) + throws IOException, URISyntaxException, HttpErrorStatusException { + String scheme = CommonConstants.HTTP_PROTOCOL; + if (serverHostPort.contains(CommonConstants.HTTPS_PROTOCOL)) { + scheme = CommonConstants.HTTPS_PROTOCOL; + serverHostPort = serverHostPort.replace(CommonConstants.HTTPS_PROTOCOL + "://", ""); + } else if (serverHostPort.contains(CommonConstants.HTTP_PROTOCOL)) { + serverHostPort = serverHostPort.replace(CommonConstants.HTTP_PROTOCOL + "://", ""); + } + + String serverHost = serverHostPort.split(":")[0]; + String serverPort = serverHostPort.split(":")[1]; + + URI reingestUri = FileUploadDownloadClient.getURI(scheme, serverHost, Integer.parseInt(serverPort), + REINGEST_SEGMENT_PATH + "/" + segmentName); + HttpClient.wrapAndThrowHttpException(HttpClient.getInstance().sendJsonPostRequest(reingestUri, "")); + } + + /** + * Picks one server among a set of servers that are supposed to host the segment, + */ + private String pickServerToReingest(Set candidateServers) { + try { + List serverList = new ArrayList<>(candidateServers); + String server = serverList.get(RANDOM.nextInt(serverList.size())); + // This should ideally handle https scheme as well + BiMap instanceToEndpointMap = + _helixResourceManager.getDataInstanceAdminEndpoints(candidateServers); + + if (instanceToEndpointMap.isEmpty()) { + LOGGER.warn("No instance data admin endpoints found for servers: {}", candidateServers); + return null; + } + + // return random server + return instanceToEndpointMap.get(server); + } catch (Exception e) { + LOGGER.warn("Failed to get Helix instance data admin endpoints for servers: {}", candidateServers, e); + } + return null; + } + public Set getSegmentsYetToBeCommitted(String tableNameWithType, Set segmentsToCheck) { Set segmentsYetToBeCommitted = new HashSet<>(); for (String segmentName: segmentsToCheck) { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java index 5ea86bd9fb42..7a9596b938a6 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java @@ -28,6 +28,7 @@ import org.apache.pinot.common.metrics.ControllerMeter; import org.apache.pinot.common.metrics.ControllerMetrics; import org.apache.pinot.common.metrics.ValidationMetrics; +import org.apache.pinot.common.utils.PauselessConsumptionUtils; import org.apache.pinot.controller.ControllerConf; import org.apache.pinot.controller.LeadControllerManager; import org.apache.pinot.controller.api.resources.PauseStatusDetails; @@ -120,6 +121,15 @@ protected void processTable(String tableNameWithType, Context context) { } else { LOGGER.info("Skipping segment-level validation for table: {}", tableConfig.getTableName()); } + + boolean isPauselessConsumptionEnabled = PauselessConsumptionUtils.isPauselessEnabled(tableConfig); + if (isPauselessConsumptionEnabled) { + _llcRealtimeSegmentManager.repairSegmentsInErrorState(tableConfig.getTableName()); + } else if (_segmentAutoResetOnErrorAtValidation) { + // reset for pauseless tables is already handled in repairSegmentsInErrorState method + // with additional checks for pauseless consumption + _pinotHelixResourceManager.resetSegments(tableConfig.getTableName(), null, true); + } } /** @@ -180,10 +190,6 @@ private void runSegmentLevelValidation(TableConfig tableConfig) { if (_llcRealtimeSegmentManager.isDeepStoreLLCSegmentUploadRetryEnabled()) { _llcRealtimeSegmentManager.uploadToDeepStoreIfMissing(tableConfig, segmentsZKMetadata); } - - if (_segmentAutoResetOnErrorAtValidation) { - _pinotHelixResourceManager.resetSegments(realtimeTableName, null, true); - } } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java index 8c030f776626..23af663545b4 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java @@ -389,6 +389,10 @@ protected void replaceSegmentIfCrcMismatch(SegmentDataManager segmentDataManager IndexLoadingConfig indexLoadingConfig) throws Exception { String segmentName = segmentDataManager.getSegmentName(); + TableConfig tableConfig = indexLoadingConfig.getTableConfig(); + // For pauseless tables, we should replace the segment if download url is missing even if crc is same + // Without this the reingestion of ERROR segments in pauseless tables fails + // as the segment data manager is still an instance of RealtimeSegmentDataManager Preconditions.checkState(segmentDataManager instanceof ImmutableSegmentDataManager, "Cannot replace CONSUMING segment: %s in table: %s", segmentName, _tableNameWithType); SegmentMetadata localMetadata = segmentDataManager.getSegment().getSegmentMetadata(); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java index dbb8a6b9da49..73a1c3b4f849 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java @@ -1056,7 +1056,7 @@ AtomicBoolean getAcquiredConsumerSemaphore() { } @VisibleForTesting - SegmentBuildDescriptor buildSegmentInternal(boolean forCommit) { + protected SegmentBuildDescriptor buildSegmentInternal(boolean forCommit) { // for partial upsert tables, do not release _partitionGroupConsumerSemaphore proactively and rely on offload() // to release the semaphore. This ensures new consuming segment is not consuming until the segment replacement is // complete. diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java index f4ca188ba277..73c8ca0adb05 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java @@ -549,9 +549,8 @@ private void doAddConsumingSegment(String segmentName) _tableDedupMetadataManager != null ? _tableDedupMetadataManager.getOrCreatePartitionManager(partitionGroupId) : null; RealtimeSegmentDataManager realtimeSegmentDataManager = - new RealtimeSegmentDataManager(zkMetadata, tableConfig, this, _indexDir.getAbsolutePath(), indexLoadingConfig, - schema, llcSegmentName, semaphore, _serverMetrics, partitionUpsertMetadataManager, - partitionDedupMetadataManager, _isTableReadyToConsumeData); + createRealtimeSegmentDataManager(zkMetadata, tableConfig, indexLoadingConfig, schema, llcSegmentName, semaphore, + partitionUpsertMetadataManager, partitionDedupMetadataManager, _isTableReadyToConsumeData); registerSegment(segmentName, realtimeSegmentDataManager, partitionUpsertMetadataManager); if (partitionUpsertMetadataManager != null) { partitionUpsertMetadataManager.trackNewlyAddedSegment(segmentName); @@ -638,6 +637,17 @@ private long getDownloadTimeOutMilliseconds(@Nullable TableConfig tableConfig) { }).orElse(DEFAULT_SEGMENT_DOWNLOAD_TIMEOUT_MS); } + @VisibleForTesting + protected RealtimeSegmentDataManager createRealtimeSegmentDataManager(SegmentZKMetadata zkMetadata, + TableConfig tableConfig, IndexLoadingConfig indexLoadingConfig, Schema schema, LLCSegmentName llcSegmentName, + Semaphore semaphore, PartitionUpsertMetadataManager partitionUpsertMetadataManager, + PartitionDedupMetadataManager partitionDedupMetadataManager, BooleanSupplier isTableReadyToConsumeData) + throws AttemptsExceededException, RetriableOperationException { + return new RealtimeSegmentDataManager(zkMetadata, tableConfig, this, _indexDir.getAbsolutePath(), + indexLoadingConfig, schema, llcSegmentName, semaphore, _serverMetrics, partitionUpsertMetadataManager, + partitionDedupMetadataManager, isTableReadyToConsumeData); + } + /** * Sets the default time value in the schema as the segment creation time if it is invalid. Time column is used to * manage the segments, so its values have to be within the valid range. diff --git a/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java b/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java index 6e1e24a688be..b94c9b5bcca4 100644 --- a/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java +++ b/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java @@ -21,20 +21,41 @@ import java.io.File; import java.net.URI; import java.net.URISyntaxException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.Map; +import java.util.UUID; import javax.net.ssl.SSLContext; +import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.tuple.Pair; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.NameValuePair; +import org.apache.hc.core5.http.message.BasicHeader; +import org.apache.hc.core5.http.message.BasicNameValuePair; import org.apache.pinot.common.auth.AuthProviderUtils; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.protocols.SegmentCompletionProtocol; import org.apache.pinot.common.utils.ClientSSLContextGenerator; import org.apache.pinot.common.utils.FileUploadDownloadClient; +import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.common.utils.SimpleHttpResponse; +import org.apache.pinot.common.utils.TarCompressionUtils; +import org.apache.pinot.common.utils.URIUtils; +import org.apache.pinot.common.utils.http.HttpClient; import org.apache.pinot.common.utils.http.HttpClientConfig; +import org.apache.pinot.core.data.manager.realtime.SegmentCompletionUtils; import org.apache.pinot.core.data.manager.realtime.Server2ControllerSegmentUploader; import org.apache.pinot.core.util.SegmentCompletionProtocolUtils; +import org.apache.pinot.segment.spi.V1Constants; import org.apache.pinot.spi.auth.AuthProvider; +import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.filesystem.PinotFS; +import org.apache.pinot.spi.filesystem.PinotFSFactory; import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.StringUtil; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -100,6 +121,14 @@ public AuthProvider getAuthProvider() { return _authProvider; } + public String getProtocol() { + return _protocol; + } + + public Integer getControllerHttpsPort() { + return _controllerHttpsPort; + } + public SegmentCompletionProtocol.Response segmentCommitStart(SegmentCompletionProtocol.Request.Params params) { SegmentCompletionProtocol.SegmentCommitStartRequest request = new SegmentCompletionProtocol.SegmentCommitStartRequest(params); @@ -250,4 +279,139 @@ private SegmentCompletionProtocol.Response sendCommitEndWithMetadataFiles(String SegmentCompletionProtocolUtils.raiseSegmentCompletionProtocolResponseMetric(_serverMetrics, response); return response; } + + public void uploadReingestedSegment(String segmentName, String segmentStoreUri, File segmentTarFile) + throws Exception { + List

headers = AuthProviderUtils.toRequestHeaders(_authProvider); + + LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName); + String rawTableName = llcSegmentName.getTableName(); + String destUriStr = StringUtil.join(File.separator, segmentStoreUri, rawTableName, + SegmentCompletionUtils.generateTmpSegmentFileName(segmentName)); + + try (PinotFS pinotFS = PinotFSFactory.create(new URI(segmentStoreUri).getScheme())) { + URI destUri = new URI(destUriStr); + if (pinotFS.exists(destUri)) { + pinotFS.delete(destUri, true); + } + pinotFS.copyFromLocalFile(segmentTarFile, destUri); + } + + headers.add(new BasicHeader(FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI, destUriStr)); + String controllerUrl = getControllerUrl(rawTableName); + triggerSegmentReingestCompletion(llcSegmentName, controllerUrl, segmentTarFile, headers); + } + + /** + * Trigger the segment reingest completion protocol to the controller. + * + * @param llcSegmentName The LLC segment name + * @param controllerUrl The base URL of the Pinot Controller (e.g., "http://controller-host:9000") + * @param segmentFile The local segment tar.gz file + * @param authHeaders A map of authentication or additional headers for the request + */ + public void triggerSegmentReingestCompletion(LLCSegmentName llcSegmentName, String controllerUrl, File segmentFile, + List
authHeaders) + throws Exception { + String segmentName = llcSegmentName.getSegmentName(); + String rawTableName = llcSegmentName.getTableName(); + + LOGGER.info("Pushing metadata of segment {} of table {} to controller: {}", segmentFile.getName(), + rawTableName, controllerUrl); + File segmentMetadataFile = generateSegmentMetadataTar(segmentFile); + + LOGGER.info("Generated segment metadata tar file: {}", segmentMetadataFile.getAbsolutePath()); + try { + // Prepare headers + List
headers = authHeaders; + + // The upload type must be METADATA + headers.add(new BasicHeader(FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE, + FileUploadDownloadClient.FileUploadType.METADATA.toString())); + + headers.add(new BasicHeader(FileUploadDownloadClient.CustomHeaders.COPY_SEGMENT_TO_DEEP_STORE, "true")); + + // Set table name parameter + List parameters = getSegmentPushCommonParams(rawTableName); + + URI reingestCompletionURI = FileUploadDownloadClient.getSegmentReingestCompletionURI(new URI(controllerUrl)); + + LOGGER.info("Uploading segment metadata to: {} with headers: {}", reingestCompletionURI, headers); + + // Perform the metadata upload + SimpleHttpResponse response = _fileUploadDownloadClient + .uploadSegmentMetadata(reingestCompletionURI, segmentName, segmentMetadataFile, headers, parameters, + HttpClient.DEFAULT_SOCKET_TIMEOUT_MS); + + LOGGER.info("Response for pushing metadata of segment {} of table {} to {} - {}: {}", segmentName, rawTableName, + controllerUrl, response.getStatusCode(), response.getResponse()); + } finally { + FileUtils.deleteQuietly(segmentMetadataFile); + } + } + + private List getSegmentPushCommonParams(String rawTableName) { + List params = new ArrayList<>(); + params.add( + new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.ENABLE_PARALLEL_PUSH_PROTECTION, "true")); + params.add(new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME, rawTableName)); + params.add( + new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_TYPE, TableType.REALTIME.toString())); + return params; + } + + /** + * Generate a tar.gz file containing only the metadata files (metadata.properties, creation.meta) + * from a given Pinot segment tar.gz file. + */ + private File generateSegmentMetadataTar(File segmentTarFile) + throws Exception { + + if (!segmentTarFile.exists()) { + throw new IllegalArgumentException("Segment tar file does not exist: " + segmentTarFile.getAbsolutePath()); + } + + LOGGER.info("Generating segment metadata tar file from segment tar: {}", segmentTarFile.getAbsolutePath()); + File tempDir = Files.createTempDirectory("pinot-segment-temp").toFile(); + String uuid = UUID.randomUUID().toString(); + try { + File metadataDir = new File(tempDir, "segmentMetadataDir-" + uuid); + if (!metadataDir.mkdirs()) { + throw new RuntimeException("Failed to create metadata directory: " + metadataDir.getAbsolutePath()); + } + + LOGGER.info("Trying to untar Metadata file from: [{}] to [{}]", segmentTarFile, metadataDir); + TarCompressionUtils.untarOneFile(segmentTarFile, V1Constants.MetadataKeys.METADATA_FILE_NAME, + new File(metadataDir, V1Constants.MetadataKeys.METADATA_FILE_NAME)); + + // Extract creation.meta + LOGGER.info("Trying to untar CreationMeta file from: [{}] to [{}]", segmentTarFile, metadataDir); + TarCompressionUtils.untarOneFile(segmentTarFile, V1Constants.SEGMENT_CREATION_META, + new File(metadataDir, V1Constants.SEGMENT_CREATION_META)); + + File segmentMetadataFile = + new File(FileUtils.getTempDirectory(), "segmentMetadata-" + UUID.randomUUID() + ".tar.gz"); + TarCompressionUtils.createCompressedTarFile(metadataDir, segmentMetadataFile); + return segmentMetadataFile; + } finally { + FileUtils.deleteQuietly(tempDir); + } + } + + public String getControllerUrl(String rawTableName) { + ControllerLeaderLocator leaderLocator = ControllerLeaderLocator.getInstance(); + final Pair leaderHostPort = leaderLocator.getControllerLeader(rawTableName); + if (leaderHostPort == null) { + LOGGER.warn("No leader found for table: {}", rawTableName); + return null; + } + Integer port = leaderHostPort.getRight(); + String protocol = _protocol; + if (_controllerHttpsPort != null) { + port = _controllerHttpsPort; + protocol = HTTPS_PROTOCOL; + } + + return URIUtils.buildURI(protocol, leaderHostPort.getLeft() + ":" + port, "", Collections.emptyMap()).toString(); + } } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasePauselessRealtimeIngestionTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasePauselessRealtimeIngestionTest.java index 5d7f7caa4a08..b5ab41e55e58 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasePauselessRealtimeIngestionTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasePauselessRealtimeIngestionTest.java @@ -26,11 +26,9 @@ import java.util.Map; import org.apache.commons.io.FileUtils; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; -import org.apache.pinot.common.metrics.ControllerMetrics; import org.apache.pinot.common.utils.PauselessConsumptionUtils; import org.apache.pinot.controller.BaseControllerStarter; import org.apache.pinot.controller.ControllerConf; -import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager; import org.apache.pinot.controller.helix.core.realtime.SegmentCompletionConfig; import org.apache.pinot.integration.tests.realtime.utils.FailureInjectingControllerStarter; @@ -45,7 +43,6 @@ import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.apache.pinot.util.TestUtils; -import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.annotations.AfterClass; @@ -75,7 +72,7 @@ public abstract class BasePauselessRealtimeIngestionTest extends BaseClusterInte @Override public BaseControllerStarter createControllerStarter() { - return new FakePauselessControllerStarter(); + return new FailureInjectingControllerStarter(); } @Override @@ -110,7 +107,7 @@ public void setUp() startController(); startBroker(); startServer(); - + setMaxSegmentCompletionTimeMillis(); setupNonPauselessTable(); injectFailure(); setupPauselessTable(); @@ -165,6 +162,14 @@ private void setupPauselessTable() addTableConfig(tableConfig); } + private void setMaxSegmentCompletionTimeMillis() { + PinotLLCRealtimeSegmentManager realtimeSegmentManager = _helixResourceManager.getRealtimeSegmentManager(); + if (realtimeSegmentManager instanceof FailureInjectingPinotLLCRealtimeSegmentManager) { + ((FailureInjectingPinotLLCRealtimeSegmentManager) realtimeSegmentManager) + .setMaxSegmentCompletionTimeoutMs(MAX_SEGMENT_COMPLETION_TIME_MILLIS); + } + } + protected void injectFailure() { PinotLLCRealtimeSegmentManager realtimeSegmentManager = _helixResourceManager.getRealtimeSegmentManager(); if (realtimeSegmentManager instanceof FailureInjectingPinotLLCRealtimeSegmentManager) { @@ -262,35 +267,4 @@ private boolean hasSegmentsInStatus(List segmentZKMetadataLis } return false; } - - private static class FakePauselessLLCRealtimeSegmentManager extends FailureInjectingPinotLLCRealtimeSegmentManager { - - public FakePauselessLLCRealtimeSegmentManager( - PinotHelixResourceManager helixResourceManager, - ControllerConf controllerConf, ControllerMetrics controllerMetrics) { - super(helixResourceManager, controllerConf, controllerMetrics); - } - - @Override - protected boolean isExceededMaxSegmentCompletionTime(String realtimeTableName, String segmentName, - long currentTimeMs) { - Stat stat = new Stat(); - getSegmentZKMetadata(realtimeTableName, segmentName, stat); - if (currentTimeMs > stat.getMtime() + MAX_SEGMENT_COMPLETION_TIME_MILLIS) { - LOGGER.info("Segment: {} exceeds the max completion time: {}ms, metadata update time: {}, current time: {}", - segmentName, MAX_SEGMENT_COMPLETION_TIME_MILLIS, stat.getMtime(), currentTimeMs); - return true; - } else { - return false; - } - } - } - - private static class FakePauselessControllerStarter extends FailureInjectingControllerStarter { - - @Override - protected PinotLLCRealtimeSegmentManager createPinotLLCRealtimeSegmentManager() { - return new FakePauselessLLCRealtimeSegmentManager(_helixResourceManager, getConfig(), _controllerMetrics); - } - } } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessRealtimeIngestionSegmentCommitFailureTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessRealtimeIngestionSegmentCommitFailureTest.java new file mode 100644 index 000000000000..b924178bcbab --- /dev/null +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessRealtimeIngestionSegmentCommitFailureTest.java @@ -0,0 +1,308 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.commons.io.FileUtils; +import org.apache.helix.model.ExternalView; +import org.apache.helix.model.IdealState; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.common.utils.helix.HelixHelper; +import org.apache.pinot.controller.BaseControllerStarter; +import org.apache.pinot.controller.ControllerConf; +import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager; +import org.apache.pinot.controller.helix.core.realtime.SegmentCompletionConfig; +import org.apache.pinot.integration.tests.realtime.utils.FailureInjectingControllerStarter; +import org.apache.pinot.integration.tests.realtime.utils.FailureInjectingPinotLLCRealtimeSegmentManager; +import org.apache.pinot.server.starter.helix.HelixInstanceDataManagerConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.ingestion.IngestionConfig; +import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.apache.pinot.util.TestUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.apache.pinot.spi.stream.StreamConfigProperties.SEGMENT_COMPLETION_FSM_SCHEME; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNull; + + +public class PauselessRealtimeIngestionSegmentCommitFailureTest extends BaseClusterIntegrationTest { + + private static final int NUM_REALTIME_SEGMENTS = 48; + protected static final long MAX_SEGMENT_COMPLETION_TIME_MILLIS = 10_000; // 5 MINUTES + private static final Logger LOGGER = + LoggerFactory.getLogger(PauselessRealtimeIngestionSegmentCommitFailureTest.class); + private static final String DEFAULT_TABLE_NAME_2 = DEFAULT_TABLE_NAME + "_2"; + private List _avroFiles; + + protected void overrideControllerConf(Map properties) { + properties.put(ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_MANAGER_SCHEDULER_ENABLED, true); + properties.put(ControllerConf.ControllerPeriodicTasksConf.ENABLE_DEEP_STORE_RETRY_UPLOAD_LLC_SEGMENT, true); + properties.put(SegmentCompletionConfig.FSM_SCHEME + "pauseless", + "org.apache.pinot.controller.helix.core.realtime.PauselessSegmentCompletionFSM"); + // Set the delay more than the time we sleep before triggering RealtimeSegmentValidationManager manually, i.e. + // MAX_SEGMENT_COMPLETION_TIME_MILLIS, to ensure that the segment level validations are performed. + properties.put(ControllerConf.ControllerPeriodicTasksConf.REALTIME_SEGMENT_VALIDATION_INITIAL_DELAY_IN_SECONDS, + 500); + } + + @Override + protected void overrideServerConf(PinotConfiguration serverConf) { + // Set segment store uri to the one used by controller as data dir (i.e. deep store) + try { + LOGGER.info("Set segment.store.uri: {} for server with scheme: {}", _controllerConfig.getDataDir(), + new URI(_controllerConfig.getDataDir()).getScheme()); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + serverConf.setProperty("pinot.server.instance.segment.store.uri", "file:" + _controllerConfig.getDataDir()); + serverConf.setProperty("pinot.server.instance." + HelixInstanceDataManagerConfig.UPLOAD_SEGMENT_TO_DEEP_STORE, + "true"); + serverConf.setProperty("pinot.server.instance." + CommonConstants.Server.TABLE_DATA_MANAGER_PROVIDER_CLASS, + "org.apache.pinot.integration.tests.realtime.utils.FailureInjectingTableDataManagerProvider"); + } + + @Override + public BaseControllerStarter createControllerStarter() { + return new FailureInjectingControllerStarter(); + } + + @BeforeClass + public void setUp() + throws Exception { + TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir); + + // Start the Pinot cluster + startZk(); + // Start a customized controller with more frequent realtime segment validation + startController(); + startBroker(); + startServer(); + + // load data in kafka + _avroFiles = unpackAvroData(_tempDir); + startKafka(); + pushAvroIntoKafka(_avroFiles); + + setMaxSegmentCompletionTimeMillis(); + // create schema for non-pauseless table + Schema schema = createSchema(); + schema.setSchemaName(DEFAULT_TABLE_NAME_2); + addSchema(schema); + + // add non-pauseless table + TableConfig tableConfig2 = createRealtimeTableConfig(_avroFiles.get(0)); + tableConfig2.setTableName(DEFAULT_TABLE_NAME_2); + tableConfig2.getValidationConfig().setRetentionTimeUnit("DAYS"); + tableConfig2.getValidationConfig().setRetentionTimeValue("100000"); + addTableConfig(tableConfig2); + + // Ensure that the commit protocol for all the segments have completed before injecting failure + waitForDocsLoaded(600_000L, true, tableConfig2.getTableName()); + TestUtils.waitForCondition((aVoid) -> { + List segmentZKMetadataList = + _helixResourceManager.getSegmentsZKMetadata(tableConfig2.getTableName()); + return assertUrlPresent(segmentZKMetadataList); + }, 1000, 100000, "Some segments still have missing url"); + + // create schema for pauseless table + schema.setSchemaName(DEFAULT_TABLE_NAME); + addSchema(schema); + + // add pauseless table + TableConfig tableConfig = createRealtimeTableConfig(_avroFiles.get(0)); + tableConfig.getValidationConfig().setRetentionTimeUnit("DAYS"); + tableConfig.getValidationConfig().setRetentionTimeValue("100000"); + + IngestionConfig ingestionConfig = new IngestionConfig(); + ingestionConfig.setStreamIngestionConfig( + new StreamIngestionConfig(List.of(tableConfig.getIndexingConfig().getStreamConfigs()))); + ingestionConfig.getStreamIngestionConfig().setPauselessConsumptionEnabled(true); + Map streamConfigMap = ingestionConfig.getStreamIngestionConfig() + .getStreamConfigMaps() + .get(0); + streamConfigMap.put(SEGMENT_COMPLETION_FSM_SCHEME, "pauseless"); + streamConfigMap.put("segmentDownloadTimeoutMinutes", "1"); + tableConfig.getIndexingConfig().setStreamConfigs(null); + tableConfig.setIngestionConfig(ingestionConfig); + + addTableConfig(tableConfig); + Thread.sleep(60000L); + TestUtils.waitForCondition( + (aVoid) -> atLeastOneErrorSegmentInExternalView(TableNameBuilder.REALTIME.tableNameWithType(getTableName())), + 1000, 600000, "Segments still not in error state"); + } + + @Test + public void testSegmentAssignment() + throws Exception { + String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(getTableName()); + + // 1) Capture which segments went into the ERROR state + List erroredSegments = getErroredSegmentsInExternalView(tableNameWithType); + assertFalse(erroredSegments.isEmpty(), "No segments found in ERROR state, expected at least one."); + + // Let the RealtimeSegmentValidationManager run so it can fix up segments + Thread.sleep(MAX_SEGMENT_COMPLETION_TIME_MILLIS); + LOGGER.info("Triggering RealtimeSegmentValidationManager to reingest errored segments"); + _controllerStarter.getRealtimeSegmentValidationManager().run(); + LOGGER.info("Finished RealtimeSegmentValidationManager to reingest errored segments"); + + // Wait until there are no ERROR segments in the ExternalView + TestUtils.waitForCondition(aVoid -> { + List errorSegmentsRemaining = getErroredSegmentsInExternalView(tableNameWithType); + return errorSegmentsRemaining.isEmpty(); + }, 10000, 100000, "Some segments are still in ERROR state after resetSegments()"); + + // Finally compare metadata across your two tables + compareZKMetadataForSegments(_helixResourceManager.getSegmentsZKMetadata(tableNameWithType), + _helixResourceManager.getSegmentsZKMetadata( + TableNameBuilder.REALTIME.tableNameWithType(DEFAULT_TABLE_NAME_2))); + } + + /** + * Returns the list of segment names in ERROR state from the ExternalView of the given table. + */ + private List getErroredSegmentsInExternalView(String tableName) { + ExternalView resourceEV = _helixResourceManager.getHelixAdmin() + .getResourceExternalView(_helixResourceManager.getHelixClusterName(), tableName); + Map> segmentAssignment = resourceEV.getRecord().getMapFields(); + List erroredSegments = new ArrayList<>(); + for (Map.Entry> entry : segmentAssignment.entrySet()) { + String segmentName = entry.getKey(); + Map serverToStateMap = entry.getValue(); + for (String state : serverToStateMap.values()) { + if ("ERROR".equals(state)) { + erroredSegments.add(segmentName); + break; // No need to check other servers for this segment + } + } + } + return erroredSegments; + } + + private void setMaxSegmentCompletionTimeMillis() { + PinotLLCRealtimeSegmentManager realtimeSegmentManager = _helixResourceManager.getRealtimeSegmentManager(); + if (realtimeSegmentManager instanceof FailureInjectingPinotLLCRealtimeSegmentManager) { + ((FailureInjectingPinotLLCRealtimeSegmentManager) realtimeSegmentManager) + .setMaxSegmentCompletionTimeoutMs(MAX_SEGMENT_COMPLETION_TIME_MILLIS); + } + } + + + private void compareZKMetadataForSegments(List segmentsZKMetadata, + List segmentsZKMetadata1) { + Map segmentZKMetadataMap = getPartitionSegmentNumberToMetadataMap(segmentsZKMetadata); + Map segmentZKMetadataMap1 = getPartitionSegmentNumberToMetadataMap(segmentsZKMetadata1); + segmentZKMetadataMap.forEach((segmentKey, segmentZKMetadata) -> { + SegmentZKMetadata segmentZKMetadata1 = segmentZKMetadataMap1.get(segmentKey); + areSegmentZkMetadataSame(segmentZKMetadata, segmentZKMetadata1); + }); + } + + private void areSegmentZkMetadataSame(SegmentZKMetadata segmentZKMetadata, SegmentZKMetadata segmentZKMetadata1) { + if (segmentZKMetadata.getStatus() != CommonConstants.Segment.Realtime.Status.DONE) { + return; + } + assertEquals(segmentZKMetadata.getStatus(), segmentZKMetadata1.getStatus()); + assertEquals(segmentZKMetadata.getStartOffset(), segmentZKMetadata1.getStartOffset()); + assertEquals(segmentZKMetadata.getEndOffset(), segmentZKMetadata1.getEndOffset()); + assertEquals(segmentZKMetadata.getTotalDocs(), segmentZKMetadata1.getTotalDocs()); + assertEquals(segmentZKMetadata.getStartTimeMs(), segmentZKMetadata1.getStartTimeMs()); + assertEquals(segmentZKMetadata.getEndTimeMs(), segmentZKMetadata1.getEndTimeMs()); + } + + private Map getPartitionSegmentNumberToMetadataMap( + List segmentsZKMetadata) { + Map segmentZKMetadataMap = new HashMap<>(); + for (SegmentZKMetadata segmentZKMetadata : segmentsZKMetadata) { + LLCSegmentName llcSegmentName = new LLCSegmentName(segmentZKMetadata.getSegmentName()); + String segmentKey = llcSegmentName.getPartitionGroupId() + "_" + llcSegmentName.getSequenceNumber(); + segmentZKMetadataMap.put(segmentKey, segmentZKMetadata); + } + return segmentZKMetadataMap; + } + + @AfterClass + public void tearDown() + throws IOException { + LOGGER.info("Tearing down..."); + dropRealtimeTable(getTableName()); + stopServer(); + stopBroker(); + stopController(); + stopKafka(); + stopZk(); + FileUtils.deleteDirectory(_tempDir); + } + + private void verifyIdealState(String tableName, int numSegmentsExpected) { + IdealState idealState = HelixHelper.getTableIdealState(_helixManager, tableName); + Map> segmentAssignment = idealState.getRecord().getMapFields(); + assertEquals(segmentAssignment.size(), numSegmentsExpected); + } + + private boolean atLeastOneErrorSegmentInExternalView(String tableName) { + ExternalView resourceEV = _helixResourceManager.getHelixAdmin() + .getResourceExternalView(_helixResourceManager.getHelixClusterName(), tableName); + Map> segmentAssigment = resourceEV.getRecord().getMapFields(); + for (Map serverToStateMap : segmentAssigment.values()) { + for (String state : serverToStateMap.values()) { + if (state.equals("ERROR")) { + return true; + } + } + } + return false; + } + + private void assertUploadUrlEmpty(List segmentZKMetadataList) { + for (SegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) { + assertNull(segmentZKMetadata.getDownloadUrl()); + } + } + + private boolean assertUrlPresent(List segmentZKMetadataList) { + for (SegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) { + if (segmentZKMetadata.getStatus() == CommonConstants.Segment.Realtime.Status.COMMITTING + && segmentZKMetadata.getDownloadUrl() == null) { + LOGGER.warn("URl not found for segment: {}", segmentZKMetadata.getSegmentName()); + return false; + } + } + return true; + } +} diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingPinotLLCRealtimeSegmentManager.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingPinotLLCRealtimeSegmentManager.java index 66573e34b543..e737599d2c42 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingPinotLLCRealtimeSegmentManager.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingPinotLLCRealtimeSegmentManager.java @@ -26,11 +26,16 @@ import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager; import org.apache.pinot.controller.helix.core.util.FailureInjectionUtils; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class FailureInjectingPinotLLCRealtimeSegmentManager extends PinotLLCRealtimeSegmentManager { @VisibleForTesting private final Map _failureConfig; + private long _maxSegmentCompletionTimeoutMs = 300000L; + private static final Logger LOGGER = LoggerFactory.getLogger(FailureInjectingPinotLLCRealtimeSegmentManager.class); public FailureInjectingPinotLLCRealtimeSegmentManager(PinotHelixResourceManager helixResourceManager, ControllerConf controllerConf, ControllerMetrics controllerMetrics) { @@ -38,6 +43,24 @@ public FailureInjectingPinotLLCRealtimeSegmentManager(PinotHelixResourceManager _failureConfig = new HashMap<>(); } + @Override + protected boolean isExceededMaxSegmentCompletionTime(String realtimeTableName, String segmentName, + long currentTimeMs) { + Stat stat = new Stat(); + getSegmentZKMetadata(realtimeTableName, segmentName, stat); + if (currentTimeMs > stat.getMtime() + _maxSegmentCompletionTimeoutMs) { + LOGGER.info("Segment: {} exceeds the max completion time: {}ms, metadata update time: {}, current time: {}", + segmentName, _maxSegmentCompletionTimeoutMs, stat.getMtime(), currentTimeMs); + return true; + } else { + return false; + } + } + + public void setMaxSegmentCompletionTimeoutMs(long maxSegmentCompletionTimeoutMs) { + _maxSegmentCompletionTimeoutMs = maxSegmentCompletionTimeoutMs; + } + @VisibleForTesting public void enableTestFault(String faultType) { _failureConfig.put(faultType, "true"); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeSegmentDataManager.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeSegmentDataManager.java new file mode 100644 index 000000000000..960138e8a088 --- /dev/null +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeSegmentDataManager.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests.realtime.utils; + +import java.util.concurrent.Semaphore; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.common.metrics.ServerMetrics; +import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager; +import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager; +import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.retry.AttemptsExceededException; +import org.apache.pinot.spi.utils.retry.RetriableOperationException; + + +/** + * A specialized RealtimeSegmentDataManager that lets us inject a forced failure + * in the commit step, which occurs strictly after the segmentConsumed message. + */ +public class FailureInjectingRealtimeSegmentDataManager extends RealtimeSegmentDataManager { + + // This flag controls whether commit should forcibly fail. + private final boolean _failCommit; + + /** + * Creates a manager that will forcibly fail the commit segment step. + */ + public FailureInjectingRealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, + TableConfig tableConfig, RealtimeTableDataManager realtimeTableDataManager, String resourceDataDir, + IndexLoadingConfig indexLoadingConfig, Schema schema, LLCSegmentName llcSegmentName, + Semaphore partitionGroupConsumerSemaphore, ServerMetrics serverMetrics, + boolean failCommit) throws AttemptsExceededException, RetriableOperationException { + // Pass through to the real parent constructor + super(segmentZKMetadata, tableConfig, realtimeTableDataManager, resourceDataDir, + indexLoadingConfig, schema, llcSegmentName, partitionGroupConsumerSemaphore, serverMetrics, + null /* no PartitionUpsertMetadataManager */, null /* no PartitionDedupMetadataManager */, + () -> true /* isReadyToConsumeData always true for tests */); + + _failCommit = failCommit; + } + + protected SegmentBuildDescriptor buildSegmentInternal(boolean forCommit) { + if (_failCommit) { + throw new RuntimeException("Forced failure in buildSegmentInternal"); + } + return super.buildSegmentInternal(forCommit); + } +} diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeTableDataManager.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeTableDataManager.java new file mode 100644 index 000000000000..2821b6373fd6 --- /dev/null +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeTableDataManager.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests.realtime.utils; + +import java.util.concurrent.Semaphore; +import java.util.function.BooleanSupplier; +import java.util.function.Supplier; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.common.utils.PauselessConsumptionUtils; +import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager; +import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager; +import org.apache.pinot.segment.local.dedup.PartitionDedupMetadataManager; +import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; +import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.retry.AttemptsExceededException; +import org.apache.pinot.spi.utils.retry.RetriableOperationException; + + +public class FailureInjectingRealtimeTableDataManager extends RealtimeTableDataManager { + private volatile boolean _hasFailedOnce = false; + + public FailureInjectingRealtimeTableDataManager(Semaphore segmentBuildSemaphore) { + this(segmentBuildSemaphore, () -> true); + } + + public FailureInjectingRealtimeTableDataManager(Semaphore segmentBuildSemaphore, + Supplier isServerReadyToServeQueries) { + super(segmentBuildSemaphore, isServerReadyToServeQueries); + } + + @Override + protected RealtimeSegmentDataManager createRealtimeSegmentDataManager(SegmentZKMetadata zkMetadata, + TableConfig tableConfig, IndexLoadingConfig indexLoadingConfig, Schema schema, LLCSegmentName llcSegmentName, + Semaphore semaphore, PartitionUpsertMetadataManager partitionUpsertMetadataManager, + PartitionDedupMetadataManager partitionDedupMetadataManager, BooleanSupplier isTableReadyToConsumeData) + throws AttemptsExceededException, RetriableOperationException { + + boolean addFailureToCommits = PauselessConsumptionUtils.isPauselessEnabled(tableConfig); + + if (addFailureToCommits) { + if (_hasFailedOnce) { + addFailureToCommits = false; + } else { + _hasFailedOnce = true; + } + } + return new FailureInjectingRealtimeSegmentDataManager(zkMetadata, tableConfig, this, _indexDir.getAbsolutePath(), + indexLoadingConfig, schema, llcSegmentName, semaphore, _serverMetrics, addFailureToCommits); + } +} diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingTableDataManagerProvider.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingTableDataManagerProvider.java new file mode 100644 index 000000000000..7129f48e1a4b --- /dev/null +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingTableDataManagerProvider.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests.realtime.utils; + +import com.google.common.cache.Cache; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Semaphore; +import java.util.function.Supplier; +import javax.annotation.Nullable; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.helix.HelixManager; +import org.apache.pinot.common.restlet.resources.SegmentErrorInfo; +import org.apache.pinot.core.data.manager.offline.DimensionTableDataManager; +import org.apache.pinot.core.data.manager.offline.OfflineTableDataManager; +import org.apache.pinot.core.data.manager.provider.TableDataManagerProvider; +import org.apache.pinot.segment.local.data.manager.TableDataManager; +import org.apache.pinot.segment.local.utils.SegmentLocks; +import org.apache.pinot.segment.local.utils.SegmentPreprocessThrottler; +import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.stream.StreamConfigProperties; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.IngestionConfigUtils; + + +/** + * Default implementation of {@link TableDataManagerProvider}. + */ +public class FailureInjectingTableDataManagerProvider implements TableDataManagerProvider { + private InstanceDataManagerConfig _instanceDataManagerConfig; + private HelixManager _helixManager; + private SegmentLocks _segmentLocks; + private Semaphore _segmentBuildSemaphore; + + @Override + public void init(InstanceDataManagerConfig instanceDataManagerConfig, HelixManager helixManager, + SegmentLocks segmentLocks, @Nullable SegmentPreprocessThrottler segmentPreprocessThrottler) { + _instanceDataManagerConfig = instanceDataManagerConfig; + _helixManager = helixManager; + _segmentLocks = segmentLocks; + int maxParallelSegmentBuilds = instanceDataManagerConfig.getMaxParallelSegmentBuilds(); + _segmentBuildSemaphore = maxParallelSegmentBuilds > 0 ? new Semaphore(maxParallelSegmentBuilds, true) : null; + } + + @Override + public TableDataManager getTableDataManager(TableConfig tableConfig, @Nullable ExecutorService segmentPreloadExecutor, + @Nullable Cache, SegmentErrorInfo> errorCache, + Supplier isServerReadyToServeQueries) { + TableDataManager tableDataManager; + switch (tableConfig.getTableType()) { + case OFFLINE: + if (tableConfig.isDimTable()) { + tableDataManager = DimensionTableDataManager.createInstanceByTableName(tableConfig.getTableName()); + } else { + tableDataManager = new OfflineTableDataManager(); + } + break; + case REALTIME: + Map streamConfigMap = IngestionConfigUtils.getStreamConfigMaps(tableConfig).get(0); + if (Boolean.parseBoolean(streamConfigMap.get(StreamConfigProperties.SERVER_UPLOAD_TO_DEEPSTORE)) + && StringUtils.isEmpty(_instanceDataManagerConfig.getSegmentStoreUri())) { + throw new IllegalStateException(String.format("Table has enabled %s config. But the server has not " + + "configured the segmentstore uri. Configure the server config %s", + StreamConfigProperties.SERVER_UPLOAD_TO_DEEPSTORE, CommonConstants.Server.CONFIG_OF_SEGMENT_STORE_URI)); + } + tableDataManager = + new FailureInjectingRealtimeTableDataManager(_segmentBuildSemaphore, isServerReadyToServeQueries); + break; + default: + throw new IllegalStateException(); + } + tableDataManager.init(_instanceDataManagerConfig, _helixManager, _segmentLocks, tableConfig, segmentPreloadExecutor, + errorCache, null); + return tableDataManager; + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/writer/StatelessRealtimeSegmentWriter.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/writer/StatelessRealtimeSegmentWriter.java new file mode 100644 index 000000000000..20895e45a123 --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/writer/StatelessRealtimeSegmentWriter.java @@ -0,0 +1,519 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.realtime.writer; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.time.Duration; +import java.time.Instant; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.Nullable; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.common.utils.TarCompressionUtils; +import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl; +import org.apache.pinot.segment.local.io.writer.impl.MmapMemoryManager; +import org.apache.pinot.segment.local.realtime.converter.RealtimeSegmentConverter; +import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig; +import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory; +import org.apache.pinot.segment.local.segment.creator.TransformPipeline; +import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; +import org.apache.pinot.segment.local.utils.IngestionUtils; +import org.apache.pinot.segment.spi.V1Constants; +import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; +import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory; +import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths; +import org.apache.pinot.spi.config.table.ColumnPartitionConfig; +import org.apache.pinot.spi.config.table.SegmentPartitionConfig; +import org.apache.pinot.spi.config.table.SegmentZKPropsConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.plugin.PluginManager; +import org.apache.pinot.spi.stream.MessageBatch; +import org.apache.pinot.spi.stream.PartitionGroupConsumer; +import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus; +import org.apache.pinot.spi.stream.StreamConfig; +import org.apache.pinot.spi.stream.StreamConsumerFactory; +import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider; +import org.apache.pinot.spi.stream.StreamDataDecoder; +import org.apache.pinot.spi.stream.StreamDataDecoderImpl; +import org.apache.pinot.spi.stream.StreamDataDecoderResult; +import org.apache.pinot.spi.stream.StreamMessage; +import org.apache.pinot.spi.stream.StreamMessageDecoder; +import org.apache.pinot.spi.stream.StreamMetadataProvider; +import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; +import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory; +import org.apache.pinot.spi.utils.IngestionConfigUtils; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.apache.pinot.spi.utils.retry.RetryPolicies; +import org.apache.pinot.spi.utils.retry.RetryPolicy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Simplified Segment Data Manager for ingesting data from a start offset to an end offset. + */ +public class StatelessRealtimeSegmentWriter { + + private static final int DEFAULT_CAPACITY = 100_000; + private static final int DEFAULT_FETCH_TIMEOUT_MS = 5000; + public static final String SEGMENT_STATS_FILE_NAME = "segment-stats.ser"; + public static final String RESOURCE_TMP_DIR_PREFIX = "resourceTmpDir_"; + public static final String RESOURCE_DATA_DIR_PREFIX = "resourceDataDir_"; + + private final Semaphore _segBuildSemaphore; + private final String _segmentName; + private final String _tableNameWithType; + private final int _partitionGroupId; + private final SegmentZKMetadata _segmentZKMetadata; + private final TableConfig _tableConfig; + private final Schema _schema; + private final StreamConfig _streamConfig; + private final StreamPartitionMsgOffsetFactory _offsetFactory; + private final StreamConsumerFactory _consumerFactory; + private StreamMetadataProvider _partitionMetadataProvider; + private final PartitionGroupConsumer _consumer; + private final StreamDataDecoder _decoder; + private final MutableSegmentImpl _realtimeSegment; + private final File _resourceTmpDir; + private final File _resourceDataDir; + private final Logger _logger; + private Thread _consumerThread; + private final AtomicBoolean _isDoneConsuming = new AtomicBoolean(false); + private final StreamPartitionMsgOffset _startOffset; + private final StreamPartitionMsgOffset _endOffset; + private volatile StreamPartitionMsgOffset _currentOffset; + private final int _fetchTimeoutMs; + private final TransformPipeline _transformPipeline; + private volatile boolean _isSuccess = false; + private volatile Throwable _consumptionException; + + public StatelessRealtimeSegmentWriter(SegmentZKMetadata segmentZKMetadata, IndexLoadingConfig indexLoadingConfig, + @Nullable Semaphore segBuildSemaphore) + throws Exception { + Preconditions.checkNotNull(indexLoadingConfig.getTableConfig(), "Table config must be set in index loading config"); + Preconditions.checkNotNull(indexLoadingConfig.getSchema(), "Schema must be set in index loading config"); + LLCSegmentName llcSegmentName = new LLCSegmentName(segmentZKMetadata.getSegmentName()); + + _segmentName = segmentZKMetadata.getSegmentName(); + _partitionGroupId = llcSegmentName.getPartitionGroupId(); + _segBuildSemaphore = segBuildSemaphore; + _tableNameWithType = TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(llcSegmentName.getTableName()); + _segmentZKMetadata = segmentZKMetadata; + _tableConfig = indexLoadingConfig.getTableConfig(); + _schema = indexLoadingConfig.getSchema(); + String tableDataDir = indexLoadingConfig.getTableDataDir() == null ? FileUtils.getTempDirectory().getAbsolutePath() + : indexLoadingConfig.getTableDataDir(); + File reingestionDir = new File(tableDataDir, "reingestion"); + _resourceTmpDir = + new File(reingestionDir, RESOURCE_TMP_DIR_PREFIX + _segmentName + "_" + System.currentTimeMillis()); + _resourceDataDir = + new File(reingestionDir, RESOURCE_DATA_DIR_PREFIX + _segmentName + "_" + System.currentTimeMillis()); + + _logger = LoggerFactory.getLogger(StatelessRealtimeSegmentWriter.class.getName() + "_" + _segmentName); + + Map streamConfigMap = IngestionConfigUtils.getStreamConfigMaps(_tableConfig).get(0); + _streamConfig = new StreamConfig(_tableNameWithType, streamConfigMap); + + _offsetFactory = StreamConsumerFactoryProvider.create(_streamConfig).createStreamMsgOffsetFactory(); + _startOffset = _offsetFactory.create(segmentZKMetadata.getStartOffset()); + _endOffset = _offsetFactory.create(segmentZKMetadata.getEndOffset()); + + String clientId = getClientId(); + + // Temp dirs + _resourceTmpDir.mkdirs(); + _resourceDataDir.mkdirs(); + + _consumerFactory = StreamConsumerFactoryProvider.create(_streamConfig); + _partitionMetadataProvider = _consumerFactory.createPartitionMetadataProvider(clientId, _partitionGroupId); + + // Create a simple PartitionGroupConsumptionStatus + PartitionGroupConsumptionStatus partitionGroupConsumptionStatus = + new PartitionGroupConsumptionStatus(_partitionGroupId, 0, _startOffset, null, null); + + _consumer = _consumerFactory.createPartitionGroupConsumer(clientId, partitionGroupConsumptionStatus); + + // Initialize decoder + Set fieldsToRead = IngestionUtils.getFieldsForRecordExtractor(_tableConfig, _schema); + _decoder = createDecoder(fieldsToRead); + + // Fetch capacity from indexLoadingConfig or use default + int capacity = _streamConfig.getFlushThresholdRows(); + if (capacity <= 0) { + capacity = DEFAULT_CAPACITY; + } + + // Fetch average number of multi-values from indexLoadingConfig + int avgNumMultiValues = indexLoadingConfig.getRealtimeAvgMultiValueCount(); + + // Load stats history, here we are using the same stats while as the RealtimeSegmentDataManager so that we are + // much more efficient in allocating buffers. It also works with empty file + File statsHistoryFile = new File(tableDataDir, SEGMENT_STATS_FILE_NAME); + RealtimeSegmentStatsHistory statsHistory = RealtimeSegmentStatsHistory.deserialzeFrom(statsHistoryFile); + + // Initialize mutable segment with configurations + RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder = + new RealtimeSegmentConfig.Builder(indexLoadingConfig).setTableNameWithType(_tableNameWithType) + .setSegmentName(_segmentName).setStreamName(_streamConfig.getTopicName()) + .setSegmentZKMetadata(_segmentZKMetadata).setStatsHistory(statsHistory).setSchema(_schema) + .setCapacity(capacity).setAvgNumMultiValues(avgNumMultiValues) + .setOffHeap(indexLoadingConfig.isRealtimeOffHeapAllocation()) + .setFieldConfigList(_tableConfig.getFieldConfigList()).setConsumerDir(_resourceDataDir.getAbsolutePath()) + .setMemoryManager( + new MmapMemoryManager(FileUtils.getTempDirectory().getAbsolutePath(), _segmentName, null)); + + setPartitionParameters(realtimeSegmentConfigBuilder, _tableConfig.getIndexingConfig().getSegmentPartitionConfig()); + + _realtimeSegment = new MutableSegmentImpl(realtimeSegmentConfigBuilder.build(), null); + + _transformPipeline = new TransformPipeline(_tableConfig, _schema); + + // Initialize fetch timeout + _fetchTimeoutMs = + _streamConfig.getFetchTimeoutMillis() > 0 ? _streamConfig.getFetchTimeoutMillis() : DEFAULT_FETCH_TIMEOUT_MS; + } + + private String getClientId() { + return _tableNameWithType + "-" + _partitionGroupId; + } + + public void startConsumption() { + // Start the consumer thread + _consumerThread = new Thread(new PartitionConsumer(), _segmentName); + _consumerThread.start(); + } + + private StreamDataDecoder createDecoder(Set fieldsToRead) + throws Exception { + AtomicReference localStreamDataDecoder = new AtomicReference<>(); + RetryPolicy retryPolicy = RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 1.2f); + retryPolicy.attempt(() -> { + try { + StreamMessageDecoder streamMessageDecoder = createMessageDecoder(fieldsToRead); + localStreamDataDecoder.set(new StreamDataDecoderImpl(streamMessageDecoder)); + return true; + } catch (Exception e) { + _logger.warn("Failed to create StreamMessageDecoder. Retrying...", e); + return false; + } + }); + return localStreamDataDecoder.get(); + } + + /** + * Creates a {@link StreamMessageDecoder} using properties in {@link StreamConfig}. + * + * @param fieldsToRead The fields to read from the source stream + * @return The initialized StreamMessageDecoder + */ + private StreamMessageDecoder createMessageDecoder(Set fieldsToRead) { + String decoderClass = _streamConfig.getDecoderClass(); + try { + StreamMessageDecoder decoder = PluginManager.get().createInstance(decoderClass); + decoder.init(fieldsToRead, _streamConfig, _tableConfig, _schema); + return decoder; + } catch (Exception e) { + throw new RuntimeException( + "Caught exception while creating StreamMessageDecoder from stream config: " + _streamConfig, e); + } + } + + private class PartitionConsumer implements Runnable { + @Override + public void run() { + try { + _consumer.start(_startOffset); + _logger.info("Created new consumer thread {} for {}", _consumerThread, this); + _currentOffset = _startOffset; + TransformPipeline.Result reusedResult = new TransformPipeline.Result(); + while (_currentOffset.compareTo(_endOffset) < 0) { + // Fetch messages + MessageBatch messageBatch = _consumer.fetchMessages(_currentOffset, _fetchTimeoutMs); + + int messageCount = messageBatch.getMessageCount(); + + for (int i = 0; i < messageCount; i++) { + StreamMessage streamMessage = messageBatch.getStreamMessage(i); + if (streamMessage.getMetadata() != null && streamMessage.getMetadata().getOffset() != null + && streamMessage.getMetadata().getOffset().compareTo(_endOffset) >= 0) { + _logger.info("Reached end offset: {} for partition group: {}", _endOffset, _partitionGroupId); + break; + } + + // Decode message + StreamDataDecoderResult decodedResult = _decoder.decode(streamMessage); + if (decodedResult.getException() == null) { + // Index message + GenericRow row = decodedResult.getResult(); + + _transformPipeline.processRow(row, reusedResult); + + List transformedRows = reusedResult.getTransformedRows(); + + for (GenericRow transformedRow : transformedRows) { + _realtimeSegment.index(transformedRow, streamMessage.getMetadata()); + } + } else { + _logger.warn("Failed to decode message at offset {}: {}", _currentOffset, decodedResult.getException()); + } + } + + _currentOffset = messageBatch.getOffsetOfNextBatch(); + } + _isSuccess = true; + } catch (Exception e) { + _logger.error("Exception in consumer thread", e); + _consumptionException = e; + throw new RuntimeException(e); + } finally { + try { + _consumer.close(); + } catch (Exception e) { + _logger.warn("Failed to close consumer", e); + } + _isDoneConsuming.set(true); + } + } + } + + public void stopConsumption() { + if (_consumerThread.isAlive()) { + _consumerThread.interrupt(); + try { + _consumerThread.join(); + } catch (InterruptedException e) { + _logger.warn("Interrupted while waiting for consumer thread to finish"); + } + } + } + + public void close() { + stopConsumption(); + _realtimeSegment.destroy(); + FileUtils.deleteQuietly(_resourceTmpDir); + FileUtils.deleteQuietly(_resourceDataDir); + } + + public boolean isDoneConsuming() { + return _isDoneConsuming.get(); + } + + public boolean isSuccess() { + return _isSuccess; + } + + public Throwable getConsumptionException() { + return _consumptionException; + } + + @VisibleForTesting + public File buildSegmentInternal() + throws Exception { + _logger.info("Building segment from {} to {}", _startOffset, _currentOffset); + final long startTimeMillis = now(); + try { + if (_segBuildSemaphore != null) { + _logger.info("Trying to acquire semaphore for building segment"); + Instant acquireStart = Instant.now(); + int timeoutSeconds = 5; + while (!_segBuildSemaphore.tryAcquire(timeoutSeconds, TimeUnit.SECONDS)) { + _logger.warn("Could not acquire semaphore for building segment in {}", + Duration.between(acquireStart, Instant.now())); + timeoutSeconds = Math.min(timeoutSeconds * 2, 300); + } + } + } catch (InterruptedException e) { + String errorMessage = "Interrupted while waiting for semaphore"; + _logger.error(errorMessage, e); + return null; + } + + final long lockAcquireTimeMillis = now(); + final long waitTimeMillis = lockAcquireTimeMillis - startTimeMillis; + _logger.info("Acquired lock for building segment in {} ms", waitTimeMillis); + // Build a segment from in-memory rows. + // Use a temporary directory + Path tempSegmentFolder = null; + try { + tempSegmentFolder = + java.nio.file.Files.createTempDirectory(_resourceTmpDir.toPath(), "tmp-" + _segmentName + "-"); + } catch (IOException e) { + _logger.error("Failed to create temporary directory for segment build", e); + return null; + } + + SegmentZKPropsConfig segmentZKPropsConfig = new SegmentZKPropsConfig(); + segmentZKPropsConfig.setStartOffset(_startOffset.toString()); + segmentZKPropsConfig.setEndOffset(_endOffset.toString()); + + // Build the segment + RealtimeSegmentConverter converter = + new RealtimeSegmentConverter(_realtimeSegment, segmentZKPropsConfig, tempSegmentFolder.toString(), _schema, + _tableNameWithType, _tableConfig, _segmentZKMetadata.getSegmentName(), + _tableConfig.getIndexingConfig().isNullHandlingEnabled()); + try { + converter.build(null, null); + } catch (Exception e) { + _logger.error("Failed to build segment", e); + FileUtils.deleteQuietly(tempSegmentFolder.toFile()); + return null; + } + final long buildTimeMillis = now() - lockAcquireTimeMillis; + _logger.info("Successfully built segment (Column Mode: {}) in {} ms, after lockWaitTime {} ms", + converter.isColumnMajorEnabled(), buildTimeMillis, waitTimeMillis); + + File dataDir = _resourceDataDir; + File indexDir = new File(dataDir, _segmentName); + FileUtils.deleteQuietly(indexDir); + + File tempIndexDir = new File(tempSegmentFolder.toFile(), _segmentName); + if (!tempIndexDir.exists()) { + _logger.error("Temp index directory {} does not exist", tempIndexDir); + FileUtils.deleteQuietly(tempSegmentFolder.toFile()); + return null; + } + try { + FileUtils.moveDirectory(tempIndexDir, indexDir); + } catch (IOException e) { + _logger.error("Caught exception while moving index directory from: {} to: {}", tempIndexDir, indexDir, e); + return null; + } finally { + FileUtils.deleteQuietly(tempSegmentFolder.toFile()); + } + + SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(indexDir); + + long segmentSizeBytes = FileUtils.sizeOfDirectory(indexDir); + File segmentTarFile = new File(dataDir, _segmentName + TarCompressionUtils.TAR_GZ_FILE_EXTENSION); + try { + TarCompressionUtils.createCompressedTarFile(indexDir, segmentTarFile); + } catch (IOException e) { + _logger.error("Caught exception while tarring index directory from: {} to: {}", indexDir, segmentTarFile, e); + return null; + } + + File metadataFile = SegmentDirectoryPaths.findMetadataFile(indexDir); + if (metadataFile == null) { + _logger.error("Failed to find metadata file under index directory: {}", indexDir); + return null; + } + File creationMetaFile = SegmentDirectoryPaths.findCreationMetaFile(indexDir); + if (creationMetaFile == null) { + _logger.error("Failed to find creation meta file under index directory: {}", indexDir); + return null; + } + Map metadataFiles = new HashMap<>(); + metadataFiles.put(V1Constants.MetadataKeys.METADATA_FILE_NAME, metadataFile); + metadataFiles.put(V1Constants.SEGMENT_CREATION_META, creationMetaFile); + return segmentTarFile; + } + + protected long now() { + return System.currentTimeMillis(); + } + + /* + * set the following partition parameters in RT segment config builder: + * - partition column + * - partition function + * - partition group id + */ + private void setPartitionParameters(RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder, + SegmentPartitionConfig segmentPartitionConfig) { + if (segmentPartitionConfig != null) { + Map columnPartitionMap = segmentPartitionConfig.getColumnPartitionMap(); + if (columnPartitionMap.size() == 1) { + Map.Entry entry = columnPartitionMap.entrySet().iterator().next(); + String partitionColumn = entry.getKey(); + ColumnPartitionConfig columnPartitionConfig = entry.getValue(); + String partitionFunctionName = columnPartitionConfig.getFunctionName(); + + // NOTE: Here we compare the number of partitions from the config and the stream, and log a warning and emit a + // metric when they don't match, but use the one from the stream. The mismatch could happen when the + // stream partitions are changed, but the table config has not been updated to reflect the change. + // In such case, picking the number of partitions from the stream can keep the segment properly + // partitioned as long as the partition function is not changed. + int numPartitions = columnPartitionConfig.getNumPartitions(); + try { + // TODO: currentPartitionGroupConsumptionStatus should be fetched from idealState + segmentZkMetadata, + // so that we get back accurate partitionGroups info + // However this is not an issue for Kafka, since partitionGroups never expire and every partitionGroup has + // a single partition + // Fix this before opening support for partitioning in Kinesis + int numPartitionGroups = + _partitionMetadataProvider.computePartitionGroupMetadata(getClientId(), _streamConfig, + Collections.emptyList(), /*maxWaitTimeMs=*/5000).size(); + + if (numPartitionGroups != numPartitions) { + _logger.info( + "Number of stream partitions: {} does not match number of partitions in the partition config: {}, " + + "using number of stream " + "partitions", numPartitionGroups, numPartitions); + numPartitions = numPartitionGroups; + } + } catch (Exception e) { + _logger.warn("Failed to get number of stream partitions in 5s, " + + "using number of partitions in the partition config: {}", numPartitions, e); + createPartitionMetadataProvider("Timeout getting number of stream partitions"); + } + + realtimeSegmentConfigBuilder.setPartitionColumn(partitionColumn); + realtimeSegmentConfigBuilder.setPartitionFunction( + PartitionFunctionFactory.getPartitionFunction(partitionFunctionName, numPartitions, null)); + realtimeSegmentConfigBuilder.setPartitionId(_partitionGroupId); + } else { + _logger.warn("Cannot partition on multiple columns: {}", columnPartitionMap.keySet()); + } + } + } + + /** + * Creates a new stream metadata provider + */ + private void createPartitionMetadataProvider(String reason) { + closePartitionMetadataProvider(); + _logger.info("Creating new partition metadata provider, reason: {}", reason); + _partitionMetadataProvider = _consumerFactory.createPartitionMetadataProvider(getClientId(), _partitionGroupId); + } + + private void closePartitionMetadataProvider() { + if (_partitionMetadataProvider != null) { + try { + _partitionMetadataProvider.close(); + } catch (Exception e) { + _logger.warn("Could not close stream metadata provider", e); + } + } + } +} diff --git a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/ReingestionResource.java b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/ReingestionResource.java new file mode 100644 index 000000000000..c44a4b462ee4 --- /dev/null +++ b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/ReingestionResource.java @@ -0,0 +1,313 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.server.api.resources; + +import com.google.common.base.Function; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiKeyAuthDefinition; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; +import io.swagger.annotations.Authorization; +import io.swagger.annotations.SecurityDefinition; +import io.swagger.annotations.SwaggerDefinition; +import java.io.File; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.inject.Inject; +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.HttpHeaders; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.core.data.manager.InstanceDataManager; +import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager; +import org.apache.pinot.segment.local.realtime.writer.StatelessRealtimeSegmentWriter; +import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; +import org.apache.pinot.server.api.resources.reingestion.ReingestionResponse; +import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler; +import org.apache.pinot.server.starter.ServerInstance; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.pinot.spi.utils.CommonConstants.DATABASE; +import static org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_KEY; + + +@Api(tags = "Reingestion", authorizations = {@Authorization(value = SWAGGER_AUTHORIZATION_KEY), + @Authorization(value = DATABASE)}) +@SwaggerDefinition(securityDefinition = @SecurityDefinition(apiKeyAuthDefinitions = { + @ApiKeyAuthDefinition(name = HttpHeaders.AUTHORIZATION, in = ApiKeyAuthDefinition.ApiKeyLocation.HEADER, + key = SWAGGER_AUTHORIZATION_KEY, + description = "The format of the key is ```\"Basic \" or \"Bearer \"```"), + @ApiKeyAuthDefinition(name = DATABASE, in = ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key = DATABASE, + description = "Database context passed through http header. If no context is provided 'default' database " + + "context will be considered.")})) +@Path("/") +public class ReingestionResource { + private static final Logger LOGGER = LoggerFactory.getLogger(ReingestionResource.class); + + //TODO: Make this configurable + private static final int MIN_REINGESTION_THREADS = 2; + private static final int MAX_PARALLEL_REINGESTIONS = + Math.max(Runtime.getRuntime().availableProcessors() / 2, MIN_REINGESTION_THREADS); + + // Tracks if a particular segment is currently being re-ingested + private static final ConcurrentHashMap + SEGMENT_INGESTION_MAP = new ConcurrentHashMap<>(); + + // Executor for asynchronous re-ingestion + private static final ExecutorService REINGESTION_EXECUTOR = Executors.newFixedThreadPool(MAX_PARALLEL_REINGESTIONS, + new ThreadFactoryBuilder().setNameFormat("reingestion-worker-%d").build()); + + // Keep track of jobs by jobId => job info + private static final ConcurrentHashMap RUNNING_JOBS = new ConcurrentHashMap<>(); + public static final long CONSUMPTION_END_TIMEOUT_MS = Duration.ofMinutes(30).toMillis(); + public static final long CHECK_INTERVAL_MS = Duration.ofSeconds(5).toMillis(); + + @Inject + private ServerInstance _serverInstance; + + /** + * Simple data class to hold job details. + */ + private static class ReingestionJob { + private final String _jobId; + private final String _tableNameWithType; + private final String _segmentName; + private final long _startTimeMs; + + ReingestionJob(String jobId, String tableNameWithType, String segmentName) { + _jobId = jobId; + _tableNameWithType = tableNameWithType; + _segmentName = segmentName; + _startTimeMs = System.currentTimeMillis(); + } + + public String getJobId() { + return _jobId; + } + + public String getTableNameWithType() { + return _tableNameWithType; + } + + public String getSegmentName() { + return _segmentName; + } + + public long getStartTimeMs() { + return _startTimeMs; + } + } + + /** + * New API to get all running re-ingestion jobs. + */ + @GET + @Path("/reingestSegment/jobs") + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation("Get all running re-ingestion jobs along with job IDs") + public Response getAllRunningReingestionJobs() { + // Filter only the jobs still marked as running + List runningJobs = new ArrayList<>(RUNNING_JOBS.values()); + return Response.ok(runningJobs).build(); + } + + @POST + @Path("/reingestSegment/{segmentName}") + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Re-ingest segment asynchronously", notes = "Returns a jobId immediately; ingestion runs in " + + "background.") + @ApiResponses(value = { + @ApiResponse(code = 200, message = "Success", response = ReingestionResponse.class), + @ApiResponse(code = 500, message = "Internal server error", response = ErrorInfo.class) + }) + public Response reingestSegment(@PathParam("segmentName") String segmentName) { + // if segment is not in LLC format, return error + if (!LLCSegmentName.isLLCSegment(segmentName)) { + throw new WebApplicationException("Segment name is not in LLC format: " + segmentName, + Response.Status.BAD_REQUEST); + } + LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName); + String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(llcSegmentName.getTableName()); + + InstanceDataManager instanceDataManager = _serverInstance.getInstanceDataManager(); + if (instanceDataManager == null) { + throw new WebApplicationException("Invalid server initialization", Response.Status.INTERNAL_SERVER_ERROR); + } + + RealtimeTableDataManager tableDataManager = + (RealtimeTableDataManager) instanceDataManager.getTableDataManager(tableNameWithType); + if (tableDataManager == null) { + throw new WebApplicationException("Table data manager not found for table: " + tableNameWithType, + Response.Status.NOT_FOUND); + } + + IndexLoadingConfig indexLoadingConfig = tableDataManager.fetchIndexLoadingConfig(); + LOGGER.info("Executing re-ingestion for table: {}, segment: {}", tableNameWithType, segmentName); + + // Get TableConfig, Schema, ZK metadata + TableConfig tableConfig = indexLoadingConfig.getTableConfig(); + if (tableConfig == null) { + throw new WebApplicationException("Table config not found for table: " + tableNameWithType, + Response.Status.NOT_FOUND); + } + Schema schema = indexLoadingConfig.getSchema(); + if (schema == null) { + throw new WebApplicationException("Schema not found for table: " + tableNameWithType, + Response.Status.NOT_FOUND); + } + + SegmentZKMetadata segmentZKMetadata = tableDataManager.fetchZKMetadata(segmentName); + if (segmentZKMetadata == null) { + throw new WebApplicationException("Segment metadata not found for segment: " + segmentName, + Response.Status.NOT_FOUND); + } + + // Check if download url is present + if (segmentZKMetadata.getDownloadUrl() != null) { + throw new WebApplicationException( + "Download URL is already present for segment: " + segmentName + ". No need to re-ingest.", + Response.Status.BAD_REQUEST); + } + + // Grab start/end offsets + String startOffsetStr = segmentZKMetadata.getStartOffset(); + String endOffsetStr = segmentZKMetadata.getEndOffset(); + if (startOffsetStr == null || endOffsetStr == null) { + throw new WebApplicationException("Null start/end offset for segment: " + segmentName, + Response.Status.INTERNAL_SERVER_ERROR); + } + + // Check if this segment is already being re-ingested + AtomicBoolean isIngesting = SEGMENT_INGESTION_MAP.computeIfAbsent(segmentName, k -> new AtomicBoolean(false)); + if (!isIngesting.compareAndSet(false, true)) { + return Response.status(Response.Status.CONFLICT) + .entity("Re-ingestion for segment: " + segmentName + " is already in progress.") + .build(); + } + + // Generate a jobId for tracking + String jobId = UUID.randomUUID().toString(); + ReingestionJob job = new ReingestionJob(jobId, tableNameWithType, segmentName); + + // Kick off the actual work asynchronously + REINGESTION_EXECUTOR.submit(() -> { + try { + StatelessRealtimeSegmentWriter manager = + new StatelessRealtimeSegmentWriter(segmentZKMetadata, indexLoadingConfig, + tableDataManager.getSegmentBuildSemaphore()); + + RUNNING_JOBS.put(jobId, job); + doReingestSegment(manager, llcSegmentName, tableNameWithType, indexLoadingConfig); + } catch (Exception e) { + LOGGER.error("Error during async re-ingestion for job {} (segment={})", jobId, segmentName, e); + } finally { + isIngesting.set(false); + RUNNING_JOBS.remove(jobId); + SEGMENT_INGESTION_MAP.remove(segmentName); + } + }); + + ReingestionResponse immediateResponse = new ReingestionResponse( + "Re-ingestion job submitted successfully with jobId: " + jobId); + return Response.ok(immediateResponse).build(); + } + + /** + * The actual re-ingestion logic, moved into a separate method for clarity. + * This is essentially the old synchronous logic you had in reingestSegment. + */ + private void doReingestSegment(StatelessRealtimeSegmentWriter manager, LLCSegmentName llcSegmentName, + String tableNameWithType, IndexLoadingConfig indexLoadingConfig) + throws Exception { + try { + String segmentName = llcSegmentName.getSegmentName(); + + manager.startConsumption(); + waitForCondition((Void) -> manager.isDoneConsuming(), CHECK_INTERVAL_MS, CONSUMPTION_END_TIMEOUT_MS, 0); + manager.stopConsumption(); + + if (!manager.isSuccess()) { + throw new Exception("Consumer failed: " + manager.getConsumptionException()); + } + + LOGGER.info("Starting build for segment {}", segmentName); + File segmentTarFile = manager.buildSegmentInternal(); + if (segmentTarFile == null) { + throw new Exception("Failed to build segment: " + segmentName); + } + + ServerSegmentCompletionProtocolHandler protocolHandler = + new ServerSegmentCompletionProtocolHandler(_serverInstance.getServerMetrics(), tableNameWithType); + protocolHandler.uploadReingestedSegment(segmentName, indexLoadingConfig.getSegmentStoreURI(), segmentTarFile); + LOGGER.info("Re-ingested segment {} uploaded successfully", segmentName); + } finally { + manager.close(); + } + } + + private void waitForCondition( + Function condition, long checkIntervalMs, long timeoutMs, long gracePeriodMs) { + long endTime = System.currentTimeMillis() + timeoutMs; + + // Adding grace period before starting the condition checks + if (gracePeriodMs > 0) { + LOGGER.info("Waiting for a grace period of {} ms before starting condition checks", gracePeriodMs); + try { + Thread.sleep(gracePeriodMs); + } catch (InterruptedException e) { + throw new RuntimeException("Interrupted during grace period wait", e); + } + } + + while (System.currentTimeMillis() < endTime) { + try { + if (Boolean.TRUE.equals(condition.apply(null))) { + LOGGER.info("Condition satisfied: {}", condition); + return; + } + Thread.sleep(checkIntervalMs); + } catch (Exception e) { + throw new RuntimeException("Caught exception while checking the condition", e); + } + } + + throw new RuntimeException("Timeout waiting for condition: " + condition); + } +} diff --git a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/reingestion/ReingestionRequest.java b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/reingestion/ReingestionRequest.java new file mode 100644 index 000000000000..06965db15ac1 --- /dev/null +++ b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/reingestion/ReingestionRequest.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.server.api.resources.reingestion; + +public class ReingestionRequest { + private String _tableNameWithType; + private String _segmentName; + + public String getTableNameWithType() { + return _tableNameWithType; + } + + public void setTableNameWithType(String tableNameWithType) { + _tableNameWithType = tableNameWithType; + } + + public String getSegmentName() { + return _segmentName; + } + + public void setSegmentName(String segmentName) { + _segmentName = segmentName; + } +} diff --git a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/reingestion/ReingestionResponse.java b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/reingestion/ReingestionResponse.java new file mode 100644 index 000000000000..63b7269d937a --- /dev/null +++ b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/reingestion/ReingestionResponse.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.server.api.resources.reingestion; + +public class ReingestionResponse { + private String _message; + + public ReingestionResponse(String message) { + _message = message; + } + + // Getter and setter + public String getMessage() { + return _message; + } + + public void setMessage(String message) { + _message = message; + } +}