Skip to content

Commit

Permalink
Add segment StarTree index rebuild throttler configurable via ZK clus…
Browse files Browse the repository at this point in the history
…ter configs (#14943)
  • Loading branch information
somandal authored Feb 5, 2025
1 parent 8313b32 commit e9ee474
Show file tree
Hide file tree
Showing 20 changed files with 989 additions and 523 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@
import org.apache.pinot.core.data.manager.offline.OfflineTableDataManager;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.segment.local.utils.SegmentAllIndexPreprocessThrottler;
import org.apache.pinot.segment.local.utils.SegmentLocks;
import org.apache.pinot.segment.local.utils.SegmentPreprocessThrottler;
import org.apache.pinot.segment.local.utils.SegmentStarTreePreprocessThrottler;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
Expand Down Expand Up @@ -118,7 +120,8 @@ private TableDataManager makeTestableManager()
when(instanceDataManagerConfig.getDeletedSegmentsCacheSize()).thenReturn(DELETED_SEGMENTS_CACHE_SIZE);
when(instanceDataManagerConfig.getDeletedSegmentsCacheTtlMinutes()).thenReturn(DELETED_SEGMENTS_TTL_MINUTES);
TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
SegmentPreprocessThrottler segmentPreprocessThrottler = new SegmentPreprocessThrottler(8, 10, true);
SegmentPreprocessThrottler segmentPreprocessThrottler = new SegmentPreprocessThrottler(
new SegmentAllIndexPreprocessThrottler(8, 10, true), new SegmentStarTreePreprocessThrottler(4, 8, true));
TableDataManager tableDataManager = new OfflineTableDataManager();
tableDataManager.init(instanceDataManagerConfig, mock(HelixManager.class), new SegmentLocks(), tableConfig, null,
null, segmentPreprocessThrottler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@
import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
import org.apache.pinot.segment.local.utils.SegmentAllIndexPreprocessThrottler;
import org.apache.pinot.segment.local.utils.SegmentLocks;
import org.apache.pinot.segment.local.utils.SegmentPreprocessThrottler;
import org.apache.pinot.segment.local.utils.SegmentStarTreePreprocessThrottler;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.segment.spi.V1Constants;
Expand Down Expand Up @@ -97,8 +99,8 @@ public class BaseTableDataManagerTest {
private static final Schema SCHEMA =
new Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME).addSingleValueDimension(STRING_COLUMN, DataType.STRING)
.addMetric(LONG_COLUMN, DataType.LONG).build();
static final SegmentPreprocessThrottler SEGMENT_PREPROCESS_THROTTLER = new SegmentPreprocessThrottler(2, 4,
true);
static final SegmentPreprocessThrottler SEGMENT_PREPROCESS_THROTTLER = new SegmentPreprocessThrottler(
new SegmentAllIndexPreprocessThrottler(2, 4, true), new SegmentStarTreePreprocessThrottler(2, 4, true));

@BeforeClass
public void setUp()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@
import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
import org.apache.pinot.segment.local.utils.SegmentAllIndexPreprocessThrottler;
import org.apache.pinot.segment.local.utils.SegmentPreprocessThrottler;
import org.apache.pinot.segment.local.utils.SegmentStarTreePreprocessThrottler;
import org.apache.pinot.segment.local.utils.TableConfigUtils;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
Expand Down Expand Up @@ -81,8 +83,8 @@ public class TableIndexingTest {
private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), "TableIndexingTest");
private static final String TABLE_NAME = "mytable";
private static final String OFFLINE_TABLE_NAME = TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME);
private static final SegmentPreprocessThrottler SEGMENT_PREPROCESS_THROTTLER = new SegmentPreprocessThrottler(2, 4,
true);
private static final SegmentPreprocessThrottler SEGMENT_PREPROCESS_THROTTLER = new SegmentPreprocessThrottler(
new SegmentAllIndexPreprocessThrottler(2, 4, true), new SegmentStarTreePreprocessThrottler(1, 2, true));
public static final String COLUMN_NAME = "col";
public static final String COLUMN_DAY_NAME = "$col$DAY";
public static final String COLUMN_MONTH_NAME = "$col$MONTH";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@
import org.apache.pinot.segment.local.segment.creator.impl.SegmentCreationDriverFactory;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.segment.index.loader.LoaderTest;
import org.apache.pinot.segment.local.utils.SegmentAllIndexPreprocessThrottler;
import org.apache.pinot.segment.local.utils.SegmentLocks;
import org.apache.pinot.segment.local.utils.SegmentPreprocessThrottler;
import org.apache.pinot.segment.local.utils.SegmentStarTreePreprocessThrottler;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver;
Expand Down Expand Up @@ -77,8 +79,8 @@ public class DimensionTableDataManagerTest {
private static final String CSV_DATA_PATH = "data/dimBaseballTeams.csv";
private static final String SCHEMA_PATH = "data/dimBaseballTeams_schema.json";
private static final String TABLE_CONFIG_PATH = "data/dimBaseballTeams_config.json";
private static final SegmentPreprocessThrottler SEGMENT_PREPROCESS_THROTTLER = new SegmentPreprocessThrottler(1, 2,
true);
private static final SegmentPreprocessThrottler SEGMENT_PREPROCESS_THROTTLER = new SegmentPreprocessThrottler(
new SegmentAllIndexPreprocessThrottler(1, 2, true), new SegmentStarTreePreprocessThrottler(1, 2, true));

private File _indexDir;
private IndexLoadingConfig _indexLoadingConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ public void setUp()
// Set max segment preprocess parallelism to 8
_helixManager.getConfigAccessor()
.set(scope, CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM, Integer.toString(8));
// Set max segment startree preprocess parallelism to 6
_helixManager.getConfigAccessor()
.set(scope, CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM, Integer.toString(6));

startBroker();
startServer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ protected void startHybridCluster()
// Set max segment preprocess parallelism to 10
_helixManager.getConfigAccessor()
.set(scope, CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM, Integer.toString(10));
// Set max segment startree preprocess parallelism to 6
_helixManager.getConfigAccessor()
.set(scope, CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM, Integer.toString(6));
startBroker();
startServers(2);
startKafka();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,9 @@ public void setUp()
// Set max segment preprocess parallelism to 8 to test that all segments can be processed
_helixManager.getConfigAccessor()
.set(scope, CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM, Integer.toString(8));
// Set max segment startree preprocess parallelism to 6 to test that all segments can be processed
_helixManager.getConfigAccessor()
.set(scope, CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM, Integer.toString(6));
startBrokers();
startServers();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,15 @@
import java.util.Map;
import java.util.function.Function;
import org.apache.commons.io.FileUtils;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.pinot.segment.spi.Constants;
import org.apache.pinot.spi.config.table.StarTreeAggregationConfig;
import org.apache.pinot.spi.config.table.StarTreeIndexConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.util.TestUtils;
import org.testng.annotations.AfterClass;
Expand All @@ -53,6 +56,12 @@ public void setUp()
// Start the Pinot cluster
startZk();
startController();
HelixConfigScope scope =
new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(getHelixClusterName())
.build();
// Set max segment startree preprocess parallelism to 6
_helixManager.getConfigAccessor()
.set(scope, CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM, Integer.toString(8));
startBroker();
startServer();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,14 +202,15 @@ public static void preprocess(File indexDir, IndexLoadingConfig indexLoadingConf
SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(indexDir);
if (segmentMetadata.getTotalDocs() > 0) {
if (segmentPreprocessThrottler != null) {
segmentPreprocessThrottler.acquire();
segmentPreprocessThrottler.getSegmentAllIndexPreprocessThrottler().acquire();
}
try {
convertSegmentFormat(indexDir, indexLoadingConfig, segmentMetadata);
preprocessSegment(indexDir, segmentMetadata.getName(), segmentMetadata.getCrc(), indexLoadingConfig, schema);
preprocessSegment(indexDir, segmentMetadata.getName(), segmentMetadata.getCrc(), indexLoadingConfig, schema,
segmentPreprocessThrottler);
} finally {
if (segmentPreprocessThrottler != null) {
segmentPreprocessThrottler.release();
segmentPreprocessThrottler.getSegmentAllIndexPreprocessThrottler().release();
}
}
}
Expand Down Expand Up @@ -322,7 +323,8 @@ private static void convertSegmentFormat(File indexDir, IndexLoadingConfig index
}

private static void preprocessSegment(File indexDir, String segmentName, String segmentCrc,
IndexLoadingConfig indexLoadingConfig, @Nullable Schema schema)
IndexLoadingConfig indexLoadingConfig, @Nullable Schema schema,
@Nullable SegmentPreprocessThrottler segmentPreprocessThrottler)
throws Exception {
PinotConfiguration segmentDirectoryConfigs = indexLoadingConfig.getSegmentDirectoryConfigs();
SegmentDirectoryLoaderContext segmentLoaderContext =
Expand All @@ -337,7 +339,7 @@ private static void preprocessSegment(File indexDir, String segmentName, String
SegmentDirectory segmentDirectory =
SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader().load(indexDir.toURI(), segmentLoaderContext);
try (SegmentPreProcessor preProcessor = new SegmentPreProcessor(segmentDirectory, indexLoadingConfig, schema)) {
preProcessor.process();
preProcessor.process(segmentPreprocessThrottler);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.pinot.segment.local.startree.StarTreeBuilderUtils;
import org.apache.pinot.segment.local.startree.v2.builder.MultipleTreesBuilder;
import org.apache.pinot.segment.local.startree.v2.builder.StarTreeV2BuilderConfig;
import org.apache.pinot.segment.local.utils.SegmentPreprocessThrottler;
import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.index.IndexHandler;
import org.apache.pinot.segment.spi.index.IndexService;
Expand Down Expand Up @@ -81,6 +82,11 @@ public void close()

public void process()
throws Exception {
process(null);
}

public void process(@Nullable SegmentPreprocessThrottler segmentPreprocessThrottler)
throws Exception {
if (_segmentMetadata.getTotalDocs() == 0) {
LOGGER.info("Skip preprocessing empty segment: {}", _segmentMetadata.getName());
return;
Expand Down Expand Up @@ -154,7 +160,7 @@ public void process()
// that the other required indices (e.g. forward index) are up-to-date.
try (SegmentDirectory.Writer segmentWriter = _segmentDirectory.createWriter()) {
// Create/modify/remove star-trees if required.
processStarTrees(indexDir);
processStarTrees(indexDir, segmentPreprocessThrottler);
_segmentDirectory.reloadMetadata();
segmentWriter.save();
}
Expand Down Expand Up @@ -238,22 +244,22 @@ private boolean needProcessStarTrees() {
return !starTreeBuilderConfigs.isEmpty();
}

private void processStarTrees(File indexDir)
private void processStarTrees(File indexDir,
@Nullable SegmentPreprocessThrottler segmentPreprocessThrottler)
throws Exception {
// Create/modify/remove star-trees if required
if (_indexLoadingConfig.isEnableDynamicStarTreeCreation()) {
List<StarTreeV2BuilderConfig> starTreeBuilderConfigs = StarTreeBuilderUtils
.generateBuilderConfigs(_indexLoadingConfig.getStarTreeIndexConfigs(),
_indexLoadingConfig.isEnableDefaultStarTree(), _segmentMetadata);
boolean shouldGenerateStarTree = !starTreeBuilderConfigs.isEmpty();
boolean shouldRemoveStarTree = false;
List<StarTreeV2Metadata> starTreeMetadataList = _segmentMetadata.getStarTreeV2MetadataList();
if (starTreeMetadataList != null) {
// There are existing star-trees
if (!shouldGenerateStarTree) {
// Newer config does not have star-trees. Delete all existing star-trees.
LOGGER.info("Removing star-trees from segment: {}", _segmentMetadata.getName());
StarTreeBuilderUtils.removeStarTrees(indexDir);
_segmentMetadata = new SegmentMetadataImpl(indexDir);
shouldRemoveStarTree = true;
} else if (StarTreeBuilderUtils.shouldModifyExistingStarTrees(starTreeBuilderConfigs, starTreeMetadataList)) {
// Existing and newer both have star-trees, but they don't match. Rebuild the star-trees.
LOGGER.info("Change detected in star-trees for segment: {}", _segmentMetadata.getName());
Expand All @@ -262,14 +268,30 @@ private void processStarTrees(File indexDir)
shouldGenerateStarTree = false;
}
}
// Generate the star-trees if needed
if (shouldGenerateStarTree) {
// NOTE: Always use OFF_HEAP mode on server side.
try (MultipleTreesBuilder builder = new MultipleTreesBuilder(starTreeBuilderConfigs, indexDir,
MultipleTreesBuilder.BuildMode.OFF_HEAP)) {
builder.build();
// Generate/remove the star-trees if needed
if (shouldGenerateStarTree || shouldRemoveStarTree) {
if (segmentPreprocessThrottler != null) {
segmentPreprocessThrottler.getSegmentStarTreePreprocessThrottler().acquire();
}

try {
if (shouldRemoveStarTree) {
// 'shouldGenerateStarTree' should be false if they need to be removed
LOGGER.info("Removing star-trees from segment: {}", _segmentMetadata.getName());
StarTreeBuilderUtils.removeStarTrees(indexDir);
} else {
// NOTE: Always use OFF_HEAP mode on server side.
try (MultipleTreesBuilder builder = new MultipleTreesBuilder(starTreeBuilderConfigs, indexDir,
MultipleTreesBuilder.BuildMode.OFF_HEAP)) {
builder.build();
}
}
_segmentMetadata = new SegmentMetadataImpl(indexDir);
} finally {
if (segmentPreprocessThrottler != null) {
segmentPreprocessThrottler.getSegmentStarTreePreprocessThrottler().release();
}
}
_segmentMetadata = new SegmentMetadataImpl(indexDir);
}
}
}
Expand Down
Loading

0 comments on commit e9ee474

Please sign in to comment.