Skip to content

Commit

Permalink
refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
KKCorps committed Jan 30, 2025
1 parent e84788a commit 0d46327
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -79,7 +80,7 @@
import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.server.api.resources.reingestion.ReIngestionRequest;
import org.apache.pinot.server.api.resources.reingestion.ReIngestionResponse;
import org.apache.pinot.server.api.resources.reingestion.utils.SimpleRealtimeSegmentDataManager;
import org.apache.pinot.server.api.resources.reingestion.utils.StatelessRealtimeSegmentDataManager;
import org.apache.pinot.server.realtime.ControllerLeaderLocator;
import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
import org.apache.pinot.server.starter.ServerInstance;
Expand Down Expand Up @@ -128,8 +129,9 @@ public class ReIngestionResource {

// Keep track of jobs by jobId => job info
private static final ConcurrentHashMap<String, ReIngestionJob> RUNNING_JOBS = new ConcurrentHashMap<>();
public static final long CONSUMPTION_END_TIMEOUT_MS = 300000L;
public static final long UPLOAD_END_TIMEOUT_MS = 300000L;
public static final long CONSUMPTION_END_TIMEOUT_MS = Duration.ofMinutes(30).toMillis();
public static final long UPLOAD_END_TIMEOUT_MS = Duration.ofMinutes(5).toMillis();
public static final long CHECK_INTERVAL_MS = Duration.ofSeconds(5).toMillis();

@Inject
private ServerInstance _serverInstance;
Expand Down Expand Up @@ -250,12 +252,6 @@ public Response reIngestSegment(ReIngestionRequest request) {
// Generate a jobId for tracking
String jobId = UUID.randomUUID().toString();
ReIngestionJob job = new ReIngestionJob(jobId, tableNameWithType, segmentName);
RUNNING_JOBS.put(jobId, job);

// Send immediate success response with jobId
ReIngestionResponse immediateResponse = new ReIngestionResponse(
"Re-ingestion job submitted successfully with jobId: " + jobId);
Response response = Response.ok(immediateResponse).build();

// Kick off the actual work asynchronously
REINGESTION_EXECUTOR.submit(() -> {
Expand All @@ -266,10 +262,11 @@ public Response reIngestSegment(ReIngestionRequest request) {
Map<String, String> streamConfigMap = IngestionConfigUtils.getStreamConfigMaps(tableConfig).get(0);
StreamConfig streamConfig = new StreamConfig(tableNameWithType, streamConfigMap);

SimpleRealtimeSegmentDataManager manager = new SimpleRealtimeSegmentDataManager(
StatelessRealtimeSegmentDataManager manager = new StatelessRealtimeSegmentDataManager(
segmentName, tableNameWithType, partitionGroupId, segmentZKMetadata, tableConfig, schema,
indexLoadingConfig, streamConfig, startOffsetStr, endOffsetStr, _serverInstance.getServerMetrics());

RUNNING_JOBS.put(jobId, job);
doReIngestSegment(manager, segmentName, tableNameWithType, indexLoadingConfig, tableDataManager);
} catch (Exception e) {
LOGGER.error("Error during async re-ingestion for job {} (segment={})", jobId, segmentName, e);
Expand All @@ -279,28 +276,30 @@ public Response reIngestSegment(ReIngestionRequest request) {
}
});

// Return immediately with the jobId
return response;
ReIngestionResponse immediateResponse = new ReIngestionResponse(
"Re-ingestion job submitted successfully with jobId: " + jobId);
return Response.ok(immediateResponse).build();
}

/**
* The actual re-ingestion logic, moved into a separate method for clarity.
* This is essentially the old synchronous logic you had in reIngestSegment.
*/
private void doReIngestSegment(SimpleRealtimeSegmentDataManager manager, String segmentName, String tableNameWithType,
IndexLoadingConfig indexLoadingConfig, TableDataManager tableDataManager)
private void doReIngestSegment(StatelessRealtimeSegmentDataManager manager, String segmentName,
String tableNameWithType, IndexLoadingConfig indexLoadingConfig, TableDataManager tableDataManager)
throws Exception {
try {
manager.startConsumption();
waitForCondition((Void) -> manager.isDoneConsuming(), 1000, CONSUMPTION_END_TIMEOUT_MS, 0);
waitForCondition((Void) -> manager.isDoneConsuming(), CHECK_INTERVAL_MS,
CONSUMPTION_END_TIMEOUT_MS, 0);
manager.stopConsumption();

if (!manager.isSuccess()) {
throw new Exception("Consumer failed: " + manager.getConsumptionException());
}

LOGGER.info("Starting build for segment {}", segmentName);
SimpleRealtimeSegmentDataManager.SegmentBuildDescriptor segmentBuildDescriptor =
StatelessRealtimeSegmentDataManager.SegmentBuildDescriptor segmentBuildDescriptor =
manager.buildSegmentInternal();

File segmentTarFile = segmentBuildDescriptor.getSegmentTarFile();
Expand Down Expand Up @@ -340,7 +339,7 @@ private void doReIngestSegment(SimpleRealtimeSegmentDataManager manager, String
}
SegmentDataManager segDataManager = tableDataManager.acquireSegment(segmentName);
return segDataManager instanceof ImmutableSegmentDataManager;
}, 5000, UPLOAD_END_TIMEOUT_MS, 0);
}, CHECK_INTERVAL_MS, UPLOAD_END_TIMEOUT_MS, 0);

// Trigger segment reset
HttpClient httpClient = HttpClient.getInstance();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
/**
* Simplified Segment Data Manager for ingesting data from a start offset to an end offset.
*/
public class SimpleRealtimeSegmentDataManager extends SegmentDataManager {
public class StatelessRealtimeSegmentDataManager extends SegmentDataManager {

private static final int DEFAULT_CAPACITY = 100_000;
private static final int DEFAULT_FETCH_TIMEOUT_MS = 5000;
Expand Down Expand Up @@ -116,7 +116,7 @@ public class SimpleRealtimeSegmentDataManager extends SegmentDataManager {
private volatile Throwable _consumptionException;
private final ServerMetrics _serverMetrics;

public SimpleRealtimeSegmentDataManager(String segmentName, String tableNameWithType, int partitionGroupId,
public StatelessRealtimeSegmentDataManager(String segmentName, String tableNameWithType, int partitionGroupId,
SegmentZKMetadata segmentZKMetadata, TableConfig tableConfig, Schema schema,
IndexLoadingConfig indexLoadingConfig, StreamConfig streamConfig, String startOffsetStr, String endOffsetStr,
ServerMetrics serverMetrics)
Expand All @@ -133,7 +133,7 @@ public SimpleRealtimeSegmentDataManager(String segmentName, String tableNameWith
_resourceTmpDir = new File(FileUtils.getTempDirectory(), "resourceTmpDir_" + System.currentTimeMillis());
_resourceDataDir = new File(FileUtils.getTempDirectory(), "resourceDataDir_" + System.currentTimeMillis());;
_serverMetrics = serverMetrics;
_logger = LoggerFactory.getLogger(SimpleRealtimeSegmentDataManager.class.getName() + "_" + _segmentName);
_logger = LoggerFactory.getLogger(StatelessRealtimeSegmentDataManager.class.getName() + "_" + _segmentName);

_offsetFactory = StreamConsumerFactoryProvider.create(_streamConfig).createStreamMsgOffsetFactory();
_startOffset = _offsetFactory.create(startOffsetStr);
Expand Down

0 comments on commit 0d46327

Please sign in to comment.