Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pauseless Consumption #3: Disaster Recovery with Reingestion #14920

Merged
merged 70 commits into from
Feb 12, 2025
Merged
Changes from 1 commit
Commits
Show all changes
70 commits
Select commit Hold shift + click to select a range
6defce3
Controller side changes to allow pauseless ingestion:
9aman Dec 19, 2024
54ab7b3
Minor improvements to improve readability
9aman Dec 20, 2024
1e40134
Server side changes to enable pauseless consumption. The changes include
9aman Dec 23, 2024
012da87
Fixes in the commit protocol for pauseless ingestion to complete the …
9aman Dec 24, 2024
a97847f
Changing the way to enable pausless post introduction of multi-stream…
9aman Dec 30, 2024
2c2ba86
WIP: Changes in the expected state while performing recovery during p…
9aman Dec 30, 2024
4d7c893
Adding changes for ensurePartitionsConsuming function that is utilize…
9aman Dec 30, 2024
a041a75
Add Server side Reingestion API
Dec 30, 2024
b6d0904
run segment level validation
Dec 30, 2024
58f6c51
Add method to trigger reingestion
Dec 30, 2024
d6313b3
Linting fixes
Dec 30, 2024
ca6134a
Adding integration tests for 3 failure scenarios that can occur durin…
9aman Jan 2, 2025
845f616
WIP: Reingestion test
Jan 6, 2025
d2dd313
Fix bug with indexes in reingestion
Jan 6, 2025
fb34fc8
Add tests for reingestion
Jan 8, 2025
50725bd
Fix controller url in Reingestion
Jan 14, 2025
e74d360
Support https and auth in reingestion
Jan 14, 2025
ce3c851
Merge branch 'master' into resolve-failures-pauseless-ingestion
9aman Jan 15, 2025
d6208a6
Removing checks on default crc and replacing them with segment status…
9aman Jan 15, 2025
7f5b720
Formatting improvements
9aman Jan 15, 2025
3f05b2f
Allowing null table config to be passed for checking pauseless consum…
9aman Jan 15, 2025
2012e38
Ensuring upload retries are backward compatible
9aman Jan 15, 2025
7b9da37
Removing unnecessary code
9aman Jan 15, 2025
ded8962
Fixing existing test cases and adding unit tests to check upload to d…
9aman Jan 16, 2025
e703d84
Refactor test cases to reduce repetition
9aman Jan 16, 2025
c2fda4a
Add missing header file
9aman Jan 16, 2025
c836009
Fix reingestion test
Jan 17, 2025
7974ab9
refactoring file upload download client
Jan 17, 2025
c08f841
Removing pauselessConsumptionEnabled from index config
9aman Jan 21, 2025
c4b99bd
Remove reingestion code
Jan 21, 2025
aee514c
Removing check in replaceSegmentIfCrcMismatch
9aman Jan 23, 2025
88a619a
Adding a new class for simple serialization and deserialization of ma…
9aman Jan 24, 2025
791ac21
Removing files related to reingestion tests
9aman Jan 24, 2025
8db5bae
Merging master and including force commit PR changes
9aman Jan 24, 2025
55b2b29
Revert "Remove reingestion code"
Jan 27, 2025
f74df66
Revert "Removing files related to reingestion tests"
Jan 27, 2025
1f4db11
Fix reingestion issue where consuming segemnts are not replaced
Jan 27, 2025
8e9249c
Copy full segment to deep store before triggerring metadata upload
Jan 27, 2025
b804a69
Refactoring: added support for tracking running reingestion jobs
Jan 27, 2025
609942d
Refactor: fix doc comments
Jan 27, 2025
f939714
Make SegmentZKMetadata JSON serializable
Jackie-Jiang Jan 28, 2025
155c49f
Minor API name change
Jackie-Jiang Jan 28, 2025
080ec55
Refactor PinotLLC class to add ability to inject failures
Jan 28, 2025
7e04fa3
Minor improvements
9aman Jan 28, 2025
523913f
Moving ZkMetadaptaUtils to commons and reusing the code in the upload…
9aman Jan 28, 2025
5689333
Fix lint failures
Jan 28, 2025
f42c6b8
Misc fix and cleanup
Jackie-Jiang Jan 29, 2025
a2eebf9
The tests were running slow due to the condition that the IdealState …
9aman Jan 29, 2025
1ba5b6c
Refactor Reingestion integration test
Jan 29, 2025
11aa170
Merge remote-tracking branch 'upstream/master' into pauseless-reinges…
Jan 29, 2025
e84788a
Fix error in tests post rebase
Jan 29, 2025
0d46327
refactoring
Jan 30, 2025
a94c7e3
Remove redundant code
Feb 3, 2025
8b9b8d1
Add support for queue in reingestion
Feb 3, 2025
d035249
Refactoring
Feb 4, 2025
bc8a65b
refaactoring
Feb 4, 2025
f082d24
Honour segment build semaphore during reingestion
Feb 4, 2025
d1ad30b
Fix test
Feb 5, 2025
7e47dd5
Add a seperate API to upload reingested segments
Feb 5, 2025
5a42c28
Cleanup code
Feb 5, 2025
1551685
Replace reIngest with reingest
Feb 6, 2025
837aa26
Replace reIngest with reingest
Feb 6, 2025
09e8583
Ensure correctness in reingestion
Feb 6, 2025
3d6fdf4
Decouple reingest completion and upload ZK path
Feb 6, 2025
b525c7e
Refactor Stateless segment writer to have least number of arguments
Feb 6, 2025
f7ae25f
Fix segment reset for pauseless tables
Feb 6, 2025
3c17957
Cleaning up Stateless segment writer
Feb 6, 2025
a3fa25c
Rename metric
Feb 6, 2025
61ebb23
Misc cleanup and logic decoupling
Jackie-Jiang Feb 12, 2025
62316c0
nit
Jackie-Jiang Feb 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Refactor test cases to reduce repetition
9aman committed Jan 16, 2025
commit e703d84c1122e033bde5a8eb73128e16435218bb
Original file line number Diff line number Diff line change
@@ -1231,7 +1231,8 @@ public void testUploadLLCSegmentToDeepStore()
when(helixAdmin.getInstanceConfig(CLUSTER_NAME, instance0)).thenReturn(instanceConfig0);
// mock the request/response for 1st segment upload
String serverUploadRequestUrl0 =
String.format("http://%s:%d/segments/%s/%s/uploadLLCSegmentToDeepStore?uploadTimeoutMs=-1", instance0, adminPort,
String.format("http://%s:%d/segments/%s/%s/uploadLLCSegmentToDeepStore?uploadTimeoutMs=-1", instance0,
adminPort,
REALTIME_TABLE_NAME, segmentsZKMetadata.get(0).getSegmentName());
// tempSegmentFileLocation is the location where the segment uploader will upload the segment. This usually ends
// with a random UUID
@@ -1262,7 +1263,8 @@ public void testUploadLLCSegmentToDeepStore()
when(helixAdmin.getInstanceConfig(CLUSTER_NAME, instance1)).thenReturn(instanceConfig1);
// mock the request/response for 2nd segment upload
String serverUploadRequestUrl1 =
String.format("http://%s:%d/segments/%s/%s/uploadLLCSegmentToDeepStore?uploadTimeoutMs=-1", instance1, adminPort,
String.format("http://%s:%d/segments/%s/%s/uploadLLCSegmentToDeepStore?uploadTimeoutMs=-1", instance1,
adminPort,
REALTIME_TABLE_NAME, segmentsZKMetadata.get(1).getSegmentName());
when(segmentManager._mockedFileUploadDownloadClient.uploadLLCToSegmentStore(serverUploadRequestUrl1)).thenThrow(
new HttpErrorStatusException("failed to upload segment",
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
package org.apache.pinot.integration.tests;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.utils.PauselessConsumptionUtils;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.helix.core.realtime.SegmentCompletionConfig;
import org.apache.pinot.integration.tests.realtime.utils.PauselessRealtimeTestUtils;
import org.apache.pinot.server.starter.helix.HelixInstanceDataManagerConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.util.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;

import static org.apache.pinot.spi.stream.StreamConfigProperties.SEGMENT_COMPLETION_FSM_SCHEME;
import static org.testng.Assert.assertTrue;


public abstract class BasePauselessRealtimeIngestionTest extends BaseClusterIntegrationTest {
protected static final int NUM_REALTIME_SEGMENTS = 48;
protected static final long MAX_SEGMENT_COMPLETION_TIME_MILLIS = 300_000L; // 5 MINUTES
protected static final long DEFAULT_COUNT_STAR_RESULT = 115545L;
protected static final String DEFAULT_TABLE_NAME_2 = DEFAULT_TABLE_NAME + "_2";

protected List<File> _avroFiles;
protected boolean _failureEnabled = false;
private static final Logger LOGGER = LoggerFactory.getLogger(BasePauselessRealtimeIngestionTest.class);

protected abstract String getFailurePoint();

protected abstract int getExpectedSegmentsWithFailure();

protected abstract int getExpectedZKMetadataWithFailure();

protected abstract long getCountStarResultWithFailure();

@Override
protected void overrideControllerConf(Map<String, Object> properties) {
properties.put(ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_MANAGER_SCHEDULER_ENABLED, true);
properties.put(ControllerConf.ControllerPeriodicTasksConf.ENABLE_DEEP_STORE_RETRY_UPLOAD_LLC_SEGMENT, true);
properties.put(SegmentCompletionConfig.FSM_SCHEME + "pauseless",
"org.apache.pinot.controller.helix.core.realtime.PauselessSegmentCompletionFSM");
properties.put(ControllerConf.ControllerPeriodicTasksConf.REALTIME_SEGMENT_VALIDATION_INITIAL_DELAY_IN_SECONDS,
500);
}

@Override
protected void overrideServerConf(PinotConfiguration serverConf) {
try {
LOGGER.info("Set segment.store.uri: {} for server with scheme: {}", _controllerConfig.getDataDir(),
new URI(_controllerConfig.getDataDir()).getScheme());
serverConf.setProperty("pinot.server.instance.segment.store.uri",
"file:" + _controllerConfig.getDataDir());
serverConf.setProperty("pinot.server.instance." + HelixInstanceDataManagerConfig.UPLOAD_SEGMENT_TO_DEEP_STORE,
"true");
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
}

@BeforeClass
public void setUp()
throws Exception {
TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
startZk();
startController();
startBroker();
startServer();

setupNonPauselessTable();
injectFailure();
setupPauselessTable();
waitForAllDocsLoaded(600_000L);
}

private void setupNonPauselessTable()
throws Exception {
_avroFiles = unpackAvroData(_tempDir);
startKafka();
pushAvroIntoKafka(_avroFiles);

Schema schema = createSchema();
schema.setSchemaName(DEFAULT_TABLE_NAME_2);
addSchema(schema);

TableConfig tableConfig = createRealtimeTableConfig(_avroFiles.get(0));
tableConfig.setTableName(DEFAULT_TABLE_NAME_2);
tableConfig.getValidationConfig().setRetentionTimeUnit("DAYS");
tableConfig.getValidationConfig().setRetentionTimeValue("100000");
addTableConfig(tableConfig);

waitForDocsLoaded(600_000L, true, tableConfig.getTableName());
TestUtils.waitForCondition((aVoid) -> {
List<SegmentZKMetadata> segmentZKMetadataList =
_helixResourceManager.getSegmentsZKMetadata(tableConfig.getTableName());
return PauselessRealtimeTestUtils.assertUrlPresent(segmentZKMetadataList);
}, 1000, 100000, "Some segments still have missing url");
}

private void setupPauselessTable()
throws Exception {
Schema schema = createSchema();
schema.setSchemaName(DEFAULT_TABLE_NAME);
addSchema(schema);

TableConfig tableConfig = createRealtimeTableConfig(_avroFiles.get(0));
tableConfig.getValidationConfig().setRetentionTimeUnit("DAYS");
tableConfig.getValidationConfig().setRetentionTimeValue("100000");

IngestionConfig ingestionConfig = new IngestionConfig();
ingestionConfig.setStreamIngestionConfig(
new StreamIngestionConfig(List.of(tableConfig.getIndexingConfig().getStreamConfigs())));
ingestionConfig.getStreamIngestionConfig().setPauselessConsumptionEnabled(true);
ingestionConfig.getStreamIngestionConfig()
.getStreamConfigMaps()
.get(0)
.put(SEGMENT_COMPLETION_FSM_SCHEME, "pauseless");
tableConfig.getIndexingConfig().setStreamConfigs(null);
tableConfig.setIngestionConfig(ingestionConfig);

addTableConfig(tableConfig);
}

protected void injectFailure() {
_helixResourceManager.getPinotLLCRealtimeSegmentManager()
.enableTestFault(getFailurePoint());
_failureEnabled = true;
}

protected void disableFailure() {
_failureEnabled = false;
_helixResourceManager.getPinotLLCRealtimeSegmentManager()
.disableTestFault(getFailurePoint());
}

@AfterClass
public void tearDown()
throws IOException {
LOGGER.info("Tearing down...");
dropRealtimeTable(getTableName());
stopServer();
stopBroker();
stopController();
stopKafka();
stopZk();
FileUtils.deleteDirectory(_tempDir);
}

protected long getCountStarResult() {
return _failureEnabled ? getCountStarResultWithFailure() : DEFAULT_COUNT_STAR_RESULT;
}

protected void runValidationAndVerify()
throws Exception {
String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(getTableName());
String tableNameWithType2 = TableNameBuilder.REALTIME.tableNameWithType(DEFAULT_TABLE_NAME_2);

PauselessRealtimeTestUtils.verifyIdealState(tableNameWithType, getExpectedSegmentsWithFailure(), _helixManager);

TestUtils.waitForCondition((aVoid) -> {
List<SegmentZKMetadata> segmentZKMetadataList =
_helixResourceManager.getSegmentsZKMetadata(tableNameWithType);
return segmentZKMetadataList.size() == getExpectedZKMetadataWithFailure();
}, 1000, 100000, "New Segment ZK Metadata not created");

Thread.sleep(MAX_SEGMENT_COMPLETION_TIME_MILLIS);
disableFailure();

_controllerStarter.getRealtimeSegmentValidationManager().run();

waitForAllDocsLoaded(600_000L);
waitForDocsLoaded(600_000L, true, tableNameWithType2);

PauselessRealtimeTestUtils.verifyIdealState(tableNameWithType, NUM_REALTIME_SEGMENTS, _helixManager);
PauselessRealtimeTestUtils.verifyIdealState(tableNameWithType2, NUM_REALTIME_SEGMENTS, _helixManager);

TestUtils.waitForCondition((aVoid) -> {
List<SegmentZKMetadata> segmentZKMetadataList =
_helixResourceManager.getSegmentsZKMetadata(tableNameWithType);
return PauselessRealtimeTestUtils.assertUrlPresent(segmentZKMetadataList);
}, 1000, 100000, "Some segments still have missing url");

PauselessRealtimeTestUtils.compareZKMetadataForSegments(
_helixResourceManager.getSegmentsZKMetadata(tableNameWithType),
_helixResourceManager.getSegmentsZKMetadata(tableNameWithType2));
}

/**
* Basic test to verify segment assignment and metadata without any failures
*/
protected void testBasicSegmentAssignment() {
String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(getTableName());

PauselessRealtimeTestUtils.verifyIdealState(tableNameWithType, NUM_REALTIME_SEGMENTS, _helixManager);
assertTrue(PauselessConsumptionUtils.isPauselessEnabled(getRealtimeTableConfig()));

TestUtils.waitForCondition((aVoid) -> {
List<SegmentZKMetadata> segmentZKMetadataList = _helixResourceManager.getSegmentsZKMetadata(tableNameWithType);
return !hasSegmentsInStatus(segmentZKMetadataList, CommonConstants.Segment.Realtime.Status.COMMITTING);
}, 1000, 100000, "Some segments have status COMMITTING");

TestUtils.waitForCondition((aVoid) -> {
List<SegmentZKMetadata> segmentZKMetadataList = _helixResourceManager.getSegmentsZKMetadata(tableNameWithType);
return PauselessRealtimeTestUtils.assertUrlPresent(segmentZKMetadataList);
}, 1000, 100000, "Some segments still have missing url");
}

private boolean hasSegmentsInStatus(List<SegmentZKMetadata> segmentZKMetadataList,
CommonConstants.Segment.Realtime.Status prohibitedStatus) {
for (SegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
if (segmentZKMetadata.getStatus() == prohibitedStatus) {
return true;
}
}
return false;
}
}
Original file line number Diff line number Diff line change
@@ -18,228 +18,50 @@
*/
package org.apache.pinot.integration.tests;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.helix.model.IdealState;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.helix.core.realtime.SegmentCompletionConfig;
import org.apache.pinot.controller.helix.core.util.FailureInjectionUtils;
import org.apache.pinot.server.starter.helix.HelixInstanceDataManagerConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.integration.tests.realtime.utils.PauselessRealtimeTestUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.util.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import static org.apache.pinot.spi.stream.StreamConfigProperties.SEGMENT_COMPLETION_FSM_SCHEME;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;


public class PauselessRealtimeIngestionCommitEndMetadataFailureTest extends BaseClusterIntegrationTest {
public class PauselessRealtimeIngestionCommitEndMetadataFailureTest
extends BasePauselessRealtimeIngestionTest {

private static final int NUM_REALTIME_SEGMENTS = 48;
protected static final long MAX_SEGMENT_COMPLETION_TIME_MILLIS = 300_000L; // 5 MINUTES
private static final Logger LOGGER =
LoggerFactory.getLogger(PauselessRealtimeIngestionCommitEndMetadataFailureTest.class);
private static final String DEFAULT_TABLE_NAME_2 = DEFAULT_TABLE_NAME + "_2";
private List<File> _avroFiles;

protected void overrideControllerConf(Map<String, Object> properties) {
properties.put(ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_MANAGER_SCHEDULER_ENABLED, true);
properties.put(ControllerConf.ControllerPeriodicTasksConf.ENABLE_DEEP_STORE_RETRY_UPLOAD_LLC_SEGMENT, true);
properties.put(SegmentCompletionConfig.FSM_SCHEME + "pauseless",
"org.apache.pinot.controller.helix.core.realtime.PauselessSegmentCompletionFSM");
// Set the delay more than the time we sleep before triggering RealtimeSegmentValidationManager manually, i.e.
// MAX_SEGMENT_COMPLETION_TIME_MILLIS, to ensure that the segment level validations are performed.
properties.put(ControllerConf.ControllerPeriodicTasksConf.REALTIME_SEGMENT_VALIDATION_INITIAL_DELAY_IN_SECONDS,
500);
@Override
protected String getFailurePoint() {
return FailureInjectionUtils.FAULT_BEFORE_COMMIT_END_METADATA;
}

@Override
protected void overrideServerConf(PinotConfiguration serverConf) {
// Set segment store uri to the one used by controller as data dir (i.e. deep store)
try {
LOGGER.info("Set segment.store.uri: {} for server with scheme: {}", _controllerConfig.getDataDir(),
new URI(_controllerConfig.getDataDir()).getScheme());
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
serverConf.setProperty("pinot.server.instance.segment.store.uri", "file:" + _controllerConfig.getDataDir());
serverConf.setProperty("pinot.server.instance." + HelixInstanceDataManagerConfig.UPLOAD_SEGMENT_TO_DEEP_STORE,
"true");
protected int getExpectedSegmentsWithFailure() {
return NUM_REALTIME_SEGMENTS; // All segments still appear in ideal state
}

@BeforeClass
public void setUp()
throws Exception {
TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);

// Start the Pinot cluster
startZk();
// Start a customized controller with more frequent realtime segment validation
startController();
startBroker();
startServer();

// load data in kafka
_avroFiles = unpackAvroData(_tempDir);
startKafka();
pushAvroIntoKafka(_avroFiles);

// create schema for non-pauseless table
Schema schema = createSchema();
schema.setSchemaName(DEFAULT_TABLE_NAME_2);
addSchema(schema);

// add non-pauseless table
TableConfig tableConfig2 = createRealtimeTableConfig(_avroFiles.get(0));
tableConfig2.setTableName(DEFAULT_TABLE_NAME_2);
tableConfig2.getValidationConfig().setRetentionTimeUnit("DAYS");
tableConfig2.getValidationConfig().setRetentionTimeValue("100000");
addTableConfig(tableConfig2);

// Ensure that the commit protocol for all the segments have completed before injecting failure
waitForDocsLoaded(600_000L, true, tableConfig2.getTableName());
TestUtils.waitForCondition((aVoid) -> {
List<SegmentZKMetadata> segmentZKMetadataList =
_helixResourceManager.getSegmentsZKMetadata(tableConfig2.getTableName());
return assertUrlPresent(segmentZKMetadataList);
}, 1000, 100000, "Some segments still have missing url");

// inject failure in the commit protocol for the pauseless table
_helixResourceManager.getPinotLLCRealtimeSegmentManager()
.enableTestFault(FailureInjectionUtils.FAULT_BEFORE_COMMIT_END_METADATA);

// create schema for pauseless table
schema.setSchemaName(DEFAULT_TABLE_NAME);
addSchema(schema);

// add pauseless table
TableConfig tableConfig = createRealtimeTableConfig(_avroFiles.get(0));
tableConfig.getValidationConfig().setRetentionTimeUnit("DAYS");
tableConfig.getValidationConfig().setRetentionTimeValue("100000");

// Replace stream config from indexing config to ingestion config
IngestionConfig ingestionConfig = new IngestionConfig();
ingestionConfig.setStreamIngestionConfig(
new StreamIngestionConfig(List.of(tableConfig.getIndexingConfig().getStreamConfigs())));
ingestionConfig.getStreamIngestionConfig().setPauselessConsumptionEnabled(true);
ingestionConfig.getStreamIngestionConfig()
.getStreamConfigMaps()
.get(0)
.put(SEGMENT_COMPLETION_FSM_SCHEME, "pauseless");
tableConfig.getIndexingConfig().setStreamConfigs(null);
tableConfig.setIngestionConfig(ingestionConfig);
@Override
protected int getExpectedZKMetadataWithFailure() {
return NUM_REALTIME_SEGMENTS;
}

addTableConfig(tableConfig);
waitForAllDocsLoaded(600_000L);
@Override
protected long getCountStarResultWithFailure() {
return DEFAULT_COUNT_STAR_RESULT;
}

@Test
public void testSegmentAssignment()
throws Exception {
String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(getTableName());
verifyIdealState(tableNameWithType, NUM_REALTIME_SEGMENTS);
assertUploadUrlEmpty(_helixResourceManager.getSegmentsZKMetadata(tableNameWithType));
// this sleep has been introduced to ensure that the RealtimeSegmentValidationManager can
// run segment level validations. The segment is not fixed by the validation manager in case the desired time
// can not elapsed
Thread.sleep(MAX_SEGMENT_COMPLETION_TIME_MILLIS);
_controllerStarter.getRealtimeSegmentValidationManager().run();
// wait for the url to show up after running validation manager
TestUtils.waitForCondition((aVoid) -> {
List<SegmentZKMetadata> segmentZKMetadataList = _helixResourceManager.getSegmentsZKMetadata(tableNameWithType);
return assertUrlPresent(segmentZKMetadataList);
}, 1000, 100000, "Some segments still have missing url");
PauselessRealtimeTestUtils.verifyIdealState(tableNameWithType, NUM_REALTIME_SEGMENTS, _helixManager);

compareZKMetadataForSegments(_helixResourceManager.getSegmentsZKMetadata(tableNameWithType),
_helixResourceManager.getSegmentsZKMetadata(TableNameBuilder.REALTIME.tableNameWithType(DEFAULT_TABLE_NAME_2)));
}

private void compareZKMetadataForSegments(List<SegmentZKMetadata> segmentsZKMetadata,
List<SegmentZKMetadata> segmentsZKMetadata1) {
Map<String, SegmentZKMetadata> segmentZKMetadataMap = getPartitionSegmentNumberToMetadataMap(segmentsZKMetadata);
Map<String, SegmentZKMetadata> segmentZKMetadataMap1 = getPartitionSegmentNumberToMetadataMap(segmentsZKMetadata1);
segmentZKMetadataMap.forEach((segmentKey, segmentZKMetadata) -> {
SegmentZKMetadata segmentZKMetadata1 = segmentZKMetadataMap1.get(segmentKey);
areSegmentZkMetadataSame(segmentZKMetadata, segmentZKMetadata1);
});
}

private void areSegmentZkMetadataSame(SegmentZKMetadata segmentZKMetadata, SegmentZKMetadata segmentZKMetadata1) {
if (segmentZKMetadata.getStatus() != CommonConstants.Segment.Realtime.Status.DONE) {
return;
}
assertEquals(segmentZKMetadata.getStatus(), segmentZKMetadata1.getStatus());
assertEquals(segmentZKMetadata.getStartOffset(), segmentZKMetadata1.getStartOffset());
assertEquals(segmentZKMetadata.getEndOffset(), segmentZKMetadata1.getEndOffset());
assertEquals(segmentZKMetadata.getTotalDocs(), segmentZKMetadata1.getTotalDocs());
assertEquals(segmentZKMetadata.getStartTimeMs(), segmentZKMetadata1.getStartTimeMs());
assertEquals(segmentZKMetadata.getEndTimeMs(), segmentZKMetadata1.getEndTimeMs());
}

private Map<String, SegmentZKMetadata> getPartitionSegmentNumberToMetadataMap(
List<SegmentZKMetadata> segmentsZKMetadata) {
Map<String, SegmentZKMetadata> segmentZKMetadataMap = new HashMap<>();
for (SegmentZKMetadata segmentZKMetadata : segmentsZKMetadata) {
LLCSegmentName llcSegmentName = new LLCSegmentName(segmentZKMetadata.getSegmentName());
String segmentKey = llcSegmentName.getPartitionGroupId() + "_" + llcSegmentName.getSequenceNumber();
segmentZKMetadataMap.put(segmentKey, segmentZKMetadata);
}
return segmentZKMetadataMap;
}

@AfterClass
public void tearDown()
throws IOException {
LOGGER.info("Tearing down...");
dropRealtimeTable(getTableName());
stopServer();
stopBroker();
stopController();
stopKafka();
stopZk();
FileUtils.deleteDirectory(_tempDir);
}

private void verifyIdealState(String tableName, int numSegmentsExpected) {
IdealState idealState = HelixHelper.getTableIdealState(_helixManager, tableName);
Map<String, Map<String, String>> segmentAssignment = idealState.getRecord().getMapFields();
assertEquals(segmentAssignment.size(), numSegmentsExpected);
}

private void assertUploadUrlEmpty(List<SegmentZKMetadata> segmentZKMetadataList) {
for (SegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
assertNull(segmentZKMetadata.getDownloadUrl());
List<SegmentZKMetadata> segmentZKMetadataList = _helixResourceManager.getSegmentsZKMetadata(tableNameWithType);
for (SegmentZKMetadata metadata : segmentZKMetadataList) {
assertNull(metadata.getDownloadUrl());
}
}

private boolean assertUrlPresent(List<SegmentZKMetadata> segmentZKMetadataList) {
for (SegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
if (segmentZKMetadata.getStatus() == CommonConstants.Segment.Realtime.Status.COMMITTING
&& segmentZKMetadata.getDownloadUrl() == null) {
LOGGER.warn("URl not found for segment: {}", segmentZKMetadata.getSegmentName());
return false;
}
}
return true;
runValidationAndVerify();
}
}
Original file line number Diff line number Diff line change
@@ -18,259 +18,40 @@
*/
package org.apache.pinot.integration.tests;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.helix.model.IdealState;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.helix.core.realtime.SegmentCompletionConfig;
import org.apache.pinot.controller.helix.core.util.FailureInjectionUtils;
import org.apache.pinot.server.starter.helix.HelixInstanceDataManagerConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.util.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import static org.apache.pinot.spi.stream.StreamConfigProperties.SEGMENT_COMPLETION_FSM_SCHEME;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;

public class PauselessRealtimeIngestionIdealStateUpdateFailureTest
extends BasePauselessRealtimeIngestionTest {

public class PauselessRealtimeIngestionIdealStateUpdateFailureTest extends BaseClusterIntegrationTest {
private static final int NUM_REALTIME_SEGMENTS = 48;
protected static final long MAX_SEGMENT_COMPLETION_TIME_MILLIS = 300_000L; // 5 MINUTES
private static final int NUM_REALTIME_SEGMENTS_WITH_FAILURE = 2;
private static final int NUM_REALTIME_SEGMENTS_ZK_METADATA_WITH_FAILURE = 4;
protected static final long DEFAULT_COUNT_STAR_RESULT_WITH_FAILURE = 5000;
private static final Logger LOGGER =
LoggerFactory.getLogger(PauselessRealtimeIngestionCommitEndMetadataFailureTest.class);
private static final String DEFAULT_TABLE_NAME_2 = DEFAULT_TABLE_NAME + "_2";
private List<File> _avroFiles;
private static boolean _failureEnabled = false;
private static final long DEFAULT_COUNT_STAR_RESULT_WITH_FAILURE = 5000;

protected void overrideControllerConf(Map<String, Object> properties) {
properties.put(ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_MANAGER_SCHEDULER_ENABLED, true);
properties.put(ControllerConf.ControllerPeriodicTasksConf.ENABLE_DEEP_STORE_RETRY_UPLOAD_LLC_SEGMENT, true);
properties.put(SegmentCompletionConfig.FSM_SCHEME + "pauseless",
"org.apache.pinot.controller.helix.core.realtime.PauselessSegmentCompletionFSM");
// Set the delay more than the time we sleep before triggering RealtimeSegmentValidationManager manually, i.e.
// MAX_SEGMENT_COMPLETION_TIME_MILLIS, to ensure that the segment level validations are performed.
properties.put(ControllerConf.ControllerPeriodicTasksConf.REALTIME_SEGMENT_VALIDATION_INITIAL_DELAY_IN_SECONDS,
500);
@Override
protected String getFailurePoint() {
return FailureInjectionUtils.FAULT_BEFORE_IDEAL_STATE_UPDATE;
}

@Override
protected void overrideServerConf(PinotConfiguration serverConf) {
// Set segment store uri to the one used by controller as data dir (i.e. deep store)
try {
LOGGER.info("Set segment.store.uri: {} for server with scheme: {}", _controllerConfig.getDataDir(),
new URI(_controllerConfig.getDataDir()).getScheme());
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
serverConf.setProperty("pinot.server.instance.segment.store.uri", "file:" + _controllerConfig.getDataDir());
serverConf.setProperty("pinot.server.instance." + HelixInstanceDataManagerConfig.UPLOAD_SEGMENT_TO_DEEP_STORE,
"true");
protected int getExpectedSegmentsWithFailure() {
return NUM_REALTIME_SEGMENTS_WITH_FAILURE;
}

@BeforeClass
public void setUp()
throws Exception {
TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);

// Start the Pinot cluster
startZk();
// Start a customized controller with more frequent realtime segment validation
startController();
startBroker();
startServer();

// load data in kafka
_avroFiles = unpackAvroData(_tempDir);
startKafka();
pushAvroIntoKafka(_avroFiles);

// create schema for non-pauseless table
Schema schema = createSchema();
schema.setSchemaName(DEFAULT_TABLE_NAME_2);
addSchema(schema);

// add non-pauseless table
TableConfig tableConfig2 = createRealtimeTableConfig(_avroFiles.get(0));
tableConfig2.setTableName(DEFAULT_TABLE_NAME_2);
tableConfig2.getValidationConfig().setRetentionTimeUnit("DAYS");
tableConfig2.getValidationConfig().setRetentionTimeValue("100000");
addTableConfig(tableConfig2);

// Ensure that the commit protocol for all the segments have completed before injecting failure
waitForDocsLoaded(600_000L, true, tableConfig2.getTableName());
TestUtils.waitForCondition((aVoid) -> {
List<SegmentZKMetadata> segmentZKMetadataList =
_helixResourceManager.getSegmentsZKMetadata(tableConfig2.getTableName());
return assertUrlPresent(segmentZKMetadataList);
}, 1000, 100000, "Some segments still have missing url");

// inject failure in the commit protocol for the pauseless table
_helixResourceManager.getPinotLLCRealtimeSegmentManager()
.enableTestFault(FailureInjectionUtils.FAULT_BEFORE_IDEAL_STATE_UPDATE);
_failureEnabled = true;

// create schema for pauseless table
schema.setSchemaName(DEFAULT_TABLE_NAME);
addSchema(schema);

// add pauseless table
TableConfig tableConfig = createRealtimeTableConfig(_avroFiles.get(0));
tableConfig.getValidationConfig().setRetentionTimeUnit("DAYS");
tableConfig.getValidationConfig().setRetentionTimeValue("100000");

// Replace stream config from indexing config to ingestion config
IngestionConfig ingestionConfig = new IngestionConfig();
ingestionConfig.setStreamIngestionConfig(
new StreamIngestionConfig(List.of(tableConfig.getIndexingConfig().getStreamConfigs())));
ingestionConfig.getStreamIngestionConfig().setPauselessConsumptionEnabled(true);
ingestionConfig.getStreamIngestionConfig()
.getStreamConfigMaps()
.get(0)
.put(SEGMENT_COMPLETION_FSM_SCHEME, "pauseless");
tableConfig.getIndexingConfig().setStreamConfigs(null);
tableConfig.setIngestionConfig(ingestionConfig);
@Override
protected int getExpectedZKMetadataWithFailure() {
return NUM_REALTIME_SEGMENTS_ZK_METADATA_WITH_FAILURE;
}

addTableConfig(tableConfig);
waitForAllDocsLoaded(600_000L);
@Override
protected long getCountStarResultWithFailure() {
return DEFAULT_COUNT_STAR_RESULT_WITH_FAILURE;
}

@Test
public void testSegmentAssignment()
throws Exception {
String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(getTableName());
String tableNameWithType2 = TableNameBuilder.REALTIME.tableNameWithType(DEFAULT_TABLE_NAME_2);
// ensure that the metadata and ideal state only contain 2 segments.
verifyIdealState(tableNameWithType, NUM_REALTIME_SEGMENTS_WITH_FAILURE);
TestUtils.waitForCondition((aVoid) -> {
List<SegmentZKMetadata> segmentZKMetadataList = _helixResourceManager.getSegmentsZKMetadata(tableNameWithType);
return segmentZKMetadataList.size() == NUM_REALTIME_SEGMENTS_ZK_METADATA_WITH_FAILURE;
}, 1000, 100000, "New Segment ZK Metadata not created");

// this sleep has been introduced to ensure that the RealtimeSegmentValidationManager can
// run segment level validations. The segment is not fixed by the validation manager in case the desired time
// can not elapsed
Thread.sleep(MAX_SEGMENT_COMPLETION_TIME_MILLIS);
_failureEnabled = false;
// inject failure in the commit protocol for the pauseless table
_helixResourceManager.getPinotLLCRealtimeSegmentManager()
.disableTestFault(FailureInjectionUtils.FAULT_BEFORE_IDEAL_STATE_UPDATE);

// Run validation manager. This should
// 1. Fix url for the segment that failed commit
// 2. Restart ingestion
_controllerStarter.getRealtimeSegmentValidationManager().run();
// verify all the documents are loaded for both the tables
waitForAllDocsLoaded(600_000L);
waitForDocsLoaded(600_000L, true, tableNameWithType2);
verifyIdealState(tableNameWithType, NUM_REALTIME_SEGMENTS);
verifyIdealState(tableNameWithType2, NUM_REALTIME_SEGMENTS);
// wait for the url to show up after running validation manager

TestUtils.waitForCondition((aVoid) -> {
List<SegmentZKMetadata> segmentZKMetadataList = _helixResourceManager.getSegmentsZKMetadata(tableNameWithType);
return assertUrlPresent(segmentZKMetadataList);
}, 1000, 100000, "Some segments still have missing url");

compareZKMetadataForSegments(_helixResourceManager.getSegmentsZKMetadata(tableNameWithType),
_helixResourceManager.getSegmentsZKMetadata(tableNameWithType2));
}

private void compareZKMetadataForSegments(List<SegmentZKMetadata> segmentsZKMetadata,
List<SegmentZKMetadata> segmentsZKMetadata1) {
Map<String, SegmentZKMetadata> segmentZKMetadataMap = getPartitionSegmentNumberToMetadataMap(segmentsZKMetadata);
Map<String, SegmentZKMetadata> segmentZKMetadataMap1 = getPartitionSegmentNumberToMetadataMap(segmentsZKMetadata1);
segmentZKMetadataMap.forEach((segmentKey, segmentZKMetadata) -> {
SegmentZKMetadata segmentZKMetadata1 = segmentZKMetadataMap1.get(segmentKey);
areSegmentZkMetadataSame(segmentZKMetadata, segmentZKMetadata1);
});
}

protected long getCountStarResult() {
if (_failureEnabled) {
return DEFAULT_COUNT_STAR_RESULT_WITH_FAILURE;
}
return DEFAULT_COUNT_STAR_RESULT;
}

private void areSegmentZkMetadataSame(SegmentZKMetadata segmentZKMetadata, SegmentZKMetadata segmentZKMetadata1) {
if (segmentZKMetadata.getStatus() != CommonConstants.Segment.Realtime.Status.DONE) {
return;
}
assertEquals(segmentZKMetadata.getStatus(), segmentZKMetadata1.getStatus());
assertEquals(segmentZKMetadata.getStartOffset(), segmentZKMetadata1.getStartOffset());
assertEquals(segmentZKMetadata.getEndOffset(), segmentZKMetadata1.getEndOffset());
assertEquals(segmentZKMetadata.getTotalDocs(), segmentZKMetadata1.getTotalDocs());
assertEquals(segmentZKMetadata.getStartTimeMs(), segmentZKMetadata1.getStartTimeMs());
assertEquals(segmentZKMetadata.getEndTimeMs(), segmentZKMetadata1.getEndTimeMs());
}

private Map<String, SegmentZKMetadata> getPartitionSegmentNumberToMetadataMap(
List<SegmentZKMetadata> segmentsZKMetadata) {
Map<String, SegmentZKMetadata> segmentZKMetadataMap = new HashMap<>();
for (SegmentZKMetadata segmentZKMetadata : segmentsZKMetadata) {
LLCSegmentName llcSegmentName = new LLCSegmentName(segmentZKMetadata.getSegmentName());
String segmentKey = llcSegmentName.getPartitionGroupId() + "_" + llcSegmentName.getSequenceNumber();
segmentZKMetadataMap.put(segmentKey, segmentZKMetadata);
}
return segmentZKMetadataMap;
}

@AfterClass
public void tearDown()
throws IOException {
LOGGER.info("Tearing down...");
dropRealtimeTable(getTableName());
stopServer();
stopBroker();
stopController();
stopKafka();
stopZk();
FileUtils.deleteDirectory(_tempDir);
}

private void verifyIdealState(String tableName, int numSegmentsExpected) {
IdealState idealState = HelixHelper.getTableIdealState(_helixManager, tableName);
Map<String, Map<String, String>> segmentAssignment = idealState.getRecord().getMapFields();
assertEquals(segmentAssignment.size(), numSegmentsExpected);
}

private void assertUploadUrlEmpty(List<SegmentZKMetadata> segmentZKMetadataList) {
for (SegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
assertNull(segmentZKMetadata.getDownloadUrl());
}
}

private boolean assertUrlPresent(List<SegmentZKMetadata> segmentZKMetadataList) {
for (SegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
if (segmentZKMetadata.getStatus() == CommonConstants.Segment.Realtime.Status.COMMITTING
&& segmentZKMetadata.getDownloadUrl() == null) {
LOGGER.warn("URl not found for segment: {}", segmentZKMetadata.getSegmentName());
return false;
}
}
return true;
runValidationAndVerify();
}
}
Original file line number Diff line number Diff line change
@@ -18,159 +18,44 @@
*/
package org.apache.pinot.integration.tests;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.helix.model.IdealState;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.utils.PauselessConsumptionUtils;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.helix.core.realtime.SegmentCompletionConfig;
import org.apache.pinot.server.starter.helix.HelixInstanceDataManagerConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.util.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import static org.apache.pinot.spi.stream.StreamConfigProperties.SEGMENT_COMPLETION_FSM_SCHEME;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;


public class PauselessRealtimeIngestionIntegrationTest extends BaseClusterIntegrationTest {

private static final int NUM_REALTIME_SEGMENTS = 48;
private static final Logger LOGGER = LoggerFactory.getLogger(PauselessRealtimeIngestionIntegrationTest.class);
private List<File> _avroFiles;

protected void overrideControllerConf(Map<String, Object> properties) {
properties.put(ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_MANAGER_SCHEDULER_ENABLED, true);
properties.put(ControllerConf.ControllerPeriodicTasksConf.ENABLE_DEEP_STORE_RETRY_UPLOAD_LLC_SEGMENT, true);
properties.put(SegmentCompletionConfig.FSM_SCHEME + "pauseless",
"org.apache.pinot.controller.helix.core.realtime.PauselessSegmentCompletionFSM");
}
public class PauselessRealtimeIngestionIntegrationTest extends BasePauselessRealtimeIngestionTest {

@Override
protected void overrideServerConf(PinotConfiguration serverConf) {
// Set segment store uri to the one used by controller as data dir (i.e. deep store)
try {
LOGGER.info("Set segment.store.uri: {} for server with scheme: {}", _controllerConfig.getDataDir(),
new URI(_controllerConfig.getDataDir()).getScheme());
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
serverConf.setProperty("pinot.server.instance.segment.store.uri", "file:" + _controllerConfig.getDataDir());
serverConf.setProperty("pinot.server.instance." + HelixInstanceDataManagerConfig.UPLOAD_SEGMENT_TO_DEEP_STORE,
"true");
}

@BeforeClass
public void setUp()
throws Exception {
TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);

// Start the Pinot cluster
startZk();
// Start a customized controller with more frequent realtime segment validation
startController();
startBroker();
startServer();

_avroFiles = unpackAvroData(_tempDir);
startKafka();
pushAvroIntoKafka(_avroFiles);

Schema schema = createSchema();
addSchema(schema);
TableConfig tableConfig = createRealtimeTableConfig(_avroFiles.get(0));
// Replace stream config from indexing config to ingestion config
IngestionConfig ingestionConfig = new IngestionConfig();
ingestionConfig.setStreamIngestionConfig(
new StreamIngestionConfig(List.of(tableConfig.getIndexingConfig().getStreamConfigs())));
ingestionConfig.getStreamIngestionConfig().setPauselessConsumptionEnabled(true);
tableConfig.getIndexingConfig().setStreamConfigs(null);
tableConfig.setIngestionConfig(ingestionConfig);
addTableConfig(tableConfig);

waitForAllDocsLoaded(600_000L);
protected String getFailurePoint() {
return null; // No failure point for basic test
}

@Test(description = "Ensure that all the segments are ingested, built and uploaded when pauseless consumption is "
+ "enabled")
public void testSegmentAssignment()
throws Exception {
String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(getTableName());
verifyIdealState(tableNameWithType, NUM_REALTIME_SEGMENTS);
assertTrue(PauselessConsumptionUtils.isPauselessEnabled(getRealtimeTableConfig()));
TestUtils.waitForCondition((aVoid) -> {
List<SegmentZKMetadata> segmentZKMetadataList = _helixResourceManager.getSegmentsZKMetadata(tableNameWithType);
return assertNoSegmentInProhibitedStatus(segmentZKMetadataList,
CommonConstants.Segment.Realtime.Status.COMMITTING);
}, 1000, 100000, "Some segments have status COMMITTING");
TestUtils.waitForCondition((aVoid) -> {
List<SegmentZKMetadata> segmentZKMetadataList = _helixResourceManager.getSegmentsZKMetadata(tableNameWithType);
return assertUrlPresent(segmentZKMetadataList);
}, 1000, 100000, "Some segments still have missing url");
@Override
protected int getExpectedSegmentsWithFailure() {
return NUM_REALTIME_SEGMENTS; // Always expect full segments
}

@AfterClass
public void tearDown()
throws IOException {
LOGGER.info("Tearing down...");
dropRealtimeTable(getTableName());
stopServer();
stopBroker();
stopController();
stopKafka();
stopZk();
FileUtils.deleteDirectory(_tempDir);
@Override
protected int getExpectedZKMetadataWithFailure() {
return NUM_REALTIME_SEGMENTS; // Always expect full metadata
}

private void verifyIdealState(String tableName, int numSegmentsExpected) {
IdealState idealState = HelixHelper.getTableIdealState(_helixManager, tableName);
Map<String, Map<String, String>> segmentAssignment = idealState.getRecord().getMapFields();
assertEquals(segmentAssignment.size(), numSegmentsExpected);
@Override
protected long getCountStarResultWithFailure() {
return DEFAULT_COUNT_STAR_RESULT; // Always expect full count
}

private boolean assertUrlPresent(List<SegmentZKMetadata> segmentZKMetadataList) {
for (SegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
if (segmentZKMetadata.getStatus() == CommonConstants.Segment.Realtime.Status.DONE
&& segmentZKMetadata.getDownloadUrl() == null) {
System.out.println("URl not found for segment: " + segmentZKMetadata.getSegmentName());
return false;
}
}
return true;
@Override
protected void injectFailure() {
// Do nothing - no failure to inject
}

private boolean assertNoSegmentInProhibitedStatus(List<SegmentZKMetadata> segmentZKMetadataList,
CommonConstants.Segment.Realtime.Status prohibitedStatus) {
for (SegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
if (segmentZKMetadata.getStatus() == prohibitedStatus) {
return false;
}
}
return true;
@Override
protected void disableFailure() {
// Do nothing - no failure to disable
}

@Override
protected Map<String, String> getStreamConfigs() {
Map<String, String> streamConfigMap = getStreamConfigMap();
streamConfigMap.put(SEGMENT_COMPLETION_FSM_SCHEME, "pauseless");
return streamConfigMap;
@Test(description = "Ensure that all the segments are ingested, built and uploaded when pauseless consumption is "
+ "enabled")
public void testSegmentAssignment() {
testBasicSegmentAssignment();
}
}
Original file line number Diff line number Diff line change
@@ -18,259 +18,40 @@
*/
package org.apache.pinot.integration.tests;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.helix.model.IdealState;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.helix.core.realtime.SegmentCompletionConfig;
import org.apache.pinot.controller.helix.core.util.FailureInjectionUtils;
import org.apache.pinot.server.starter.helix.HelixInstanceDataManagerConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.util.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import static org.apache.pinot.spi.stream.StreamConfigProperties.SEGMENT_COMPLETION_FSM_SCHEME;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;

public class PauselessRealtimeIngestionNewSegmentMetadataCreationFailureTest
extends BasePauselessRealtimeIngestionTest {

public class PauselessRealtimeIngestionNewSegmentMetadataCreationFailureTest extends BaseClusterIntegrationTest {
private static final int NUM_REALTIME_SEGMENTS = 48;
protected static final long MAX_SEGMENT_COMPLETION_TIME_MILLIS = 300_000L; // 5 MINUTES
private static final int NUM_REALTIME_SEGMENTS_WITH_FAILURE = 2;
private static final int NUM_REALTIME_SEGMENTS_ZK_METADATA_WITH_FAILURE = 2;
protected static final long DEFAULT_COUNT_STAR_RESULT_WITH_FAILURE = 5000;
private static final Logger LOGGER =
LoggerFactory.getLogger(PauselessRealtimeIngestionCommitEndMetadataFailureTest.class);
private static final String DEFAULT_TABLE_NAME_2 = DEFAULT_TABLE_NAME + "_2";
private List<File> _avroFiles;
private static boolean _failureEnabled = false;
private static final long DEFAULT_COUNT_STAR_RESULT_WITH_FAILURE = 5000;

protected void overrideControllerConf(Map<String, Object> properties) {
properties.put(ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_MANAGER_SCHEDULER_ENABLED, true);
properties.put(ControllerConf.ControllerPeriodicTasksConf.ENABLE_DEEP_STORE_RETRY_UPLOAD_LLC_SEGMENT, true);
properties.put(SegmentCompletionConfig.FSM_SCHEME + "pauseless",
"org.apache.pinot.controller.helix.core.realtime.PauselessSegmentCompletionFSM");
// Set the delay more than the time we sleep before triggering RealtimeSegmentValidationManager manually, i.e.
// MAX_SEGMENT_COMPLETION_TIME_MILLIS, to ensure that the segment level validations are performed.
properties.put(ControllerConf.ControllerPeriodicTasksConf.REALTIME_SEGMENT_VALIDATION_INITIAL_DELAY_IN_SECONDS,
500);
@Override
protected String getFailurePoint() {
return FailureInjectionUtils.FAULT_BEFORE_NEW_SEGMENT_METADATA_CREATION;
}

@Override
protected void overrideServerConf(PinotConfiguration serverConf) {
// Set segment store uri to the one used by controller as data dir (i.e. deep store)
try {
LOGGER.info("Set segment.store.uri: {} for server with scheme: {}", _controllerConfig.getDataDir(),
new URI(_controllerConfig.getDataDir()).getScheme());
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
serverConf.setProperty("pinot.server.instance.segment.store.uri", "file:" + _controllerConfig.getDataDir());
serverConf.setProperty("pinot.server.instance." + HelixInstanceDataManagerConfig.UPLOAD_SEGMENT_TO_DEEP_STORE,
"true");
protected int getExpectedSegmentsWithFailure() {
return NUM_REALTIME_SEGMENTS_WITH_FAILURE;
}

@BeforeClass
public void setUp()
throws Exception {
TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);

// Start the Pinot cluster
startZk();
// Start a customized controller with more frequent realtime segment validation
startController();
startBroker();
startServer();

// load data in kafka
_avroFiles = unpackAvroData(_tempDir);
startKafka();
pushAvroIntoKafka(_avroFiles);

// create schema for non-pauseless table
Schema schema = createSchema();
schema.setSchemaName(DEFAULT_TABLE_NAME_2);
addSchema(schema);

// add non-pauseless table
TableConfig tableConfig2 = createRealtimeTableConfig(_avroFiles.get(0));
tableConfig2.setTableName(DEFAULT_TABLE_NAME_2);
tableConfig2.getValidationConfig().setRetentionTimeUnit("DAYS");
tableConfig2.getValidationConfig().setRetentionTimeValue("100000");
addTableConfig(tableConfig2);

// Ensure that the commit protocol for all the segments have completed before injecting failure
waitForDocsLoaded(600_000L, true, tableConfig2.getTableName());
TestUtils.waitForCondition((aVoid) -> {
List<SegmentZKMetadata> segmentZKMetadataList =
_helixResourceManager.getSegmentsZKMetadata(tableConfig2.getTableName());
return assertUrlPresent(segmentZKMetadataList);
}, 1000, 100000, "Some segments still have missing url");

// inject failure in the commit protocol for the pauseless table
_helixResourceManager.getPinotLLCRealtimeSegmentManager()
.enableTestFault(FailureInjectionUtils.FAULT_BEFORE_NEW_SEGMENT_METADATA_CREATION);
_failureEnabled = true;

// create schema for pauseless table
schema.setSchemaName(DEFAULT_TABLE_NAME);
addSchema(schema);

// add pauseless table
TableConfig tableConfig = createRealtimeTableConfig(_avroFiles.get(0));
tableConfig.getValidationConfig().setRetentionTimeUnit("DAYS");
tableConfig.getValidationConfig().setRetentionTimeValue("100000");

// Replace stream config from indexing config to ingestion config
IngestionConfig ingestionConfig = new IngestionConfig();
ingestionConfig.setStreamIngestionConfig(
new StreamIngestionConfig(List.of(tableConfig.getIndexingConfig().getStreamConfigs())));
ingestionConfig.getStreamIngestionConfig().setPauselessConsumptionEnabled(true);
ingestionConfig.getStreamIngestionConfig()
.getStreamConfigMaps()
.get(0)
.put(SEGMENT_COMPLETION_FSM_SCHEME, "pauseless");
tableConfig.getIndexingConfig().setStreamConfigs(null);
tableConfig.setIngestionConfig(ingestionConfig);
@Override
protected int getExpectedZKMetadataWithFailure() {
return NUM_REALTIME_SEGMENTS_ZK_METADATA_WITH_FAILURE;
}

addTableConfig(tableConfig);
waitForAllDocsLoaded(600_000L);
@Override
protected long getCountStarResultWithFailure() {
return DEFAULT_COUNT_STAR_RESULT_WITH_FAILURE;
}

@Test
public void testSegmentAssignment()
throws Exception {
String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(getTableName());
String tableNameWithType2 = TableNameBuilder.REALTIME.tableNameWithType(DEFAULT_TABLE_NAME_2);
// ensure that the metadata and ideal state only contain 2 segments.
verifyIdealState(tableNameWithType, NUM_REALTIME_SEGMENTS_WITH_FAILURE);
TestUtils.waitForCondition((aVoid) -> {
List<SegmentZKMetadata> segmentZKMetadataList = _helixResourceManager.getSegmentsZKMetadata(tableNameWithType);
return segmentZKMetadataList.size() == NUM_REALTIME_SEGMENTS_ZK_METADATA_WITH_FAILURE;
}, 1000, 100000, "New Segment ZK Metadata not created");

// this sleep has been introduced to ensure that the RealtimeSegmentValidationManager can
// run segment level validations. The segment is not fixed by the validation manager in case the desired time
// can not elapsed
Thread.sleep(MAX_SEGMENT_COMPLETION_TIME_MILLIS);
_failureEnabled = false;
// inject failure in the commit protocol for the pauseless table
_helixResourceManager.getPinotLLCRealtimeSegmentManager()
.disableTestFault(FailureInjectionUtils.FAULT_BEFORE_NEW_SEGMENT_METADATA_CREATION);

// Run validation manager. This should
// 1. Fix url for the segment that failed commit
// 2. Restart ingestion
_controllerStarter.getRealtimeSegmentValidationManager().run();
// verify all the documents are loaded for both the tables
waitForAllDocsLoaded(600_000L);
waitForDocsLoaded(600_000L, true, tableNameWithType2);
verifyIdealState(tableNameWithType, NUM_REALTIME_SEGMENTS);
verifyIdealState(tableNameWithType2, NUM_REALTIME_SEGMENTS);
// wait for the url to show up after running validation manager

TestUtils.waitForCondition((aVoid) -> {
List<SegmentZKMetadata> segmentZKMetadataList = _helixResourceManager.getSegmentsZKMetadata(tableNameWithType);
return assertUrlPresent(segmentZKMetadataList);
}, 1000, 100000, "Some segments still have missing url");

compareZKMetadataForSegments(_helixResourceManager.getSegmentsZKMetadata(tableNameWithType),
_helixResourceManager.getSegmentsZKMetadata(tableNameWithType2));
}

private void compareZKMetadataForSegments(List<SegmentZKMetadata> segmentsZKMetadata,
List<SegmentZKMetadata> segmentsZKMetadata1) {
Map<String, SegmentZKMetadata> segmentZKMetadataMap = getPartitionSegmentNumberToMetadataMap(segmentsZKMetadata);
Map<String, SegmentZKMetadata> segmentZKMetadataMap1 = getPartitionSegmentNumberToMetadataMap(segmentsZKMetadata1);
segmentZKMetadataMap.forEach((segmentKey, segmentZKMetadata) -> {
SegmentZKMetadata segmentZKMetadata1 = segmentZKMetadataMap1.get(segmentKey);
areSegmentZkMetadataSame(segmentZKMetadata, segmentZKMetadata1);
});
}

protected long getCountStarResult() {
if (_failureEnabled) {
return DEFAULT_COUNT_STAR_RESULT_WITH_FAILURE;
}
return DEFAULT_COUNT_STAR_RESULT;
}

private void areSegmentZkMetadataSame(SegmentZKMetadata segmentZKMetadata, SegmentZKMetadata segmentZKMetadata1) {
if (segmentZKMetadata.getStatus() != CommonConstants.Segment.Realtime.Status.DONE) {
return;
}
assertEquals(segmentZKMetadata.getStatus(), segmentZKMetadata1.getStatus());
assertEquals(segmentZKMetadata.getStartOffset(), segmentZKMetadata1.getStartOffset());
assertEquals(segmentZKMetadata.getEndOffset(), segmentZKMetadata1.getEndOffset());
assertEquals(segmentZKMetadata.getTotalDocs(), segmentZKMetadata1.getTotalDocs());
assertEquals(segmentZKMetadata.getStartTimeMs(), segmentZKMetadata1.getStartTimeMs());
assertEquals(segmentZKMetadata.getEndTimeMs(), segmentZKMetadata1.getEndTimeMs());
}

private Map<String, SegmentZKMetadata> getPartitionSegmentNumberToMetadataMap(
List<SegmentZKMetadata> segmentsZKMetadata) {
Map<String, SegmentZKMetadata> segmentZKMetadataMap = new HashMap<>();
for (SegmentZKMetadata segmentZKMetadata : segmentsZKMetadata) {
LLCSegmentName llcSegmentName = new LLCSegmentName(segmentZKMetadata.getSegmentName());
String segmentKey = llcSegmentName.getPartitionGroupId() + "_" + llcSegmentName.getSequenceNumber();
segmentZKMetadataMap.put(segmentKey, segmentZKMetadata);
}
return segmentZKMetadataMap;
}

@AfterClass
public void tearDown()
throws IOException {
LOGGER.info("Tearing down...");
dropRealtimeTable(getTableName());
stopServer();
stopBroker();
stopController();
stopKafka();
stopZk();
FileUtils.deleteDirectory(_tempDir);
}

private void verifyIdealState(String tableName, int numSegmentsExpected) {
IdealState idealState = HelixHelper.getTableIdealState(_helixManager, tableName);
Map<String, Map<String, String>> segmentAssignment = idealState.getRecord().getMapFields();
assertEquals(segmentAssignment.size(), numSegmentsExpected);
}

private void assertUploadUrlEmpty(List<SegmentZKMetadata> segmentZKMetadataList) {
for (SegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
assertNull(segmentZKMetadata.getDownloadUrl());
}
}

private boolean assertUrlPresent(List<SegmentZKMetadata> segmentZKMetadataList) {
for (SegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
if (segmentZKMetadata.getStatus() == CommonConstants.Segment.Realtime.Status.COMMITTING
&& segmentZKMetadata.getDownloadUrl() == null) {
LOGGER.warn("URl not found for segment: {}", segmentZKMetadata.getSegmentName());
return false;
}
}
return true;
runValidationAndVerify();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.integration.tests.realtime.utils;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.helix.HelixManager;
import org.apache.helix.model.IdealState;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.spi.utils.CommonConstants;

import static org.testng.Assert.assertEquals;


public class PauselessRealtimeTestUtils {

private PauselessRealtimeTestUtils() {
}

public static void verifyIdealState(String tableName, int numSegmentsExpected, HelixManager helixManager) {
IdealState idealState = HelixHelper.getTableIdealState(helixManager, tableName);
Map<String, Map<String, String>> segmentAssignment = idealState.getRecord().getMapFields();
assertEquals(segmentAssignment.size(), numSegmentsExpected);
}

public static boolean assertUrlPresent(List<SegmentZKMetadata> segmentZKMetadataList) {
for (SegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
if (segmentZKMetadata.getStatus() == CommonConstants.Segment.Realtime.Status.COMMITTING
&& segmentZKMetadata.getDownloadUrl() == null) {
return false;
}
}
return true;
}

public static void compareZKMetadataForSegments(List<SegmentZKMetadata> segmentsZKMetadata,
List<SegmentZKMetadata> segmentsZKMetadata1) {
Map<String, SegmentZKMetadata> segmentZKMetadataMap =
getPartitionSegmentNumberToMetadataMap(segmentsZKMetadata);
Map<String, SegmentZKMetadata> segmentZKMetadataMap1 =
getPartitionSegmentNumberToMetadataMap(segmentsZKMetadata1);

segmentZKMetadataMap.forEach((segmentKey, segmentZKMetadata) -> {
SegmentZKMetadata segmentZKMetadata1 = segmentZKMetadataMap1.get(segmentKey);
compareSegmentZkMetadata(segmentZKMetadata, segmentZKMetadata1);
});
}

private static Map<String, SegmentZKMetadata> getPartitionSegmentNumberToMetadataMap(
List<SegmentZKMetadata> segmentsZKMetadata) {
Map<String, SegmentZKMetadata> segmentZKMetadataMap = new HashMap<>();
for (SegmentZKMetadata segmentZKMetadata : segmentsZKMetadata) {
LLCSegmentName llcSegmentName = new LLCSegmentName(segmentZKMetadata.getSegmentName());
String segmentKey = llcSegmentName.getPartitionGroupId() + "_" + llcSegmentName.getSequenceNumber();
segmentZKMetadataMap.put(segmentKey, segmentZKMetadata);
}
return segmentZKMetadataMap;
}

private static void compareSegmentZkMetadata(SegmentZKMetadata segmentZKMetadata,
SegmentZKMetadata segmentZKMetadata1) {
if (segmentZKMetadata.getStatus() != CommonConstants.Segment.Realtime.Status.DONE) {
return;
}
assertEquals(segmentZKMetadata.getStatus(), segmentZKMetadata1.getStatus());
assertEquals(segmentZKMetadata.getStartOffset(), segmentZKMetadata1.getStartOffset());
assertEquals(segmentZKMetadata.getEndOffset(), segmentZKMetadata1.getEndOffset());
assertEquals(segmentZKMetadata.getTotalDocs(), segmentZKMetadata1.getTotalDocs());
assertEquals(segmentZKMetadata.getStartTimeMs(), segmentZKMetadata1.getStartTimeMs());
assertEquals(segmentZKMetadata.getEndTimeMs(), segmentZKMetadata1.getEndTimeMs());
}
}