diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index c51dda97fa37..62b2876f9e81 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -2199,8 +2199,8 @@ URI createSegmentPath(String rawTableName, String segmentName) { } /** - * Re-ingests segments that are in DONE status with a missing download URL, but also - * have no peer copy on any server. This method will call the server reIngestSegment API + * Re-ingests segments that are in ERROR state in EV but ONLINE in IS with no peer copy on any server. This method + * will call the server reIngestSegment API * on one of the alive servers that are supposed to host that segment according to IdealState. * * API signature: @@ -2208,9 +2208,7 @@ URI createSegmentPath(String rawTableName, String segmentName) { * Request body (JSON): * { * "tableNameWithType": [tableName], - * "segmentName": [segmentName], - * "uploadURI": [leadControllerUrl], - * "uploadSegment": true + * "segmentName": [segmentName] * } * * @param tableNameWithType The table name with type, e.g. "myTable_REALTIME" diff --git a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/ReIngestionResource.java b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/ReIngestionResource.java index 9b8b6600f466..2d9eb10f5844 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/ReIngestionResource.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/ReIngestionResource.java @@ -128,6 +128,8 @@ public class ReIngestionResource { // Keep track of jobs by jobId => job info private static final ConcurrentHashMap RUNNING_JOBS = new ConcurrentHashMap<>(); + public static final long CONSUMPTION_END_TIMEOUT_MS = 300000L; + public static final long UPLOAD_END_TIMEOUT_MS = 300000L; @Inject private ServerInstance _serverInstance; @@ -290,7 +292,7 @@ private void doReIngestSegment(SimpleRealtimeSegmentDataManager manager, String throws Exception { try { manager.startConsumption(); - waitForCondition((Void) -> manager.isDoneConsuming(), 1000, 300_000, 0); + waitForCondition((Void) -> manager.isDoneConsuming(), 1000, CONSUMPTION_END_TIMEOUT_MS, 0); manager.stopConsumption(); if (!manager.isSuccess()) { @@ -338,7 +340,7 @@ private void doReIngestSegment(SimpleRealtimeSegmentDataManager manager, String } SegmentDataManager segDataManager = tableDataManager.acquireSegment(segmentName); return segDataManager instanceof ImmutableSegmentDataManager; - }, 5000, 300_000, 0); + }, 5000, UPLOAD_END_TIMEOUT_MS, 0); // Trigger segment reset HttpClient httpClient = HttpClient.getInstance();