diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java index 8c030f776626..9d4c2bd2a98f 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java @@ -36,7 +36,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; @@ -71,6 +70,7 @@ import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils; import org.apache.pinot.segment.local.startree.StarTreeBuilderUtils; import org.apache.pinot.segment.local.startree.v2.builder.StarTreeV2BuilderConfig; +import org.apache.pinot.segment.local.utils.SegmentDownloadThrottler; import org.apache.pinot.segment.local.utils.SegmentLocks; import org.apache.pinot.segment.local.utils.SegmentPreprocessThrottler; import org.apache.pinot.segment.spi.ColumnMetadata; @@ -91,6 +91,7 @@ import org.apache.pinot.segment.spi.store.SegmentDirectory; import org.apache.pinot.spi.auth.AuthProvider; import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig; +import org.apache.pinot.spi.config.provider.PinotClusterConfigProvider; import org.apache.pinot.spi.config.table.ColumnPartitionConfig; import org.apache.pinot.spi.config.table.SegmentPartitionConfig; import org.apache.pinot.spi.config.table.StarTreeIndexConfig; @@ -127,9 +128,9 @@ public abstract class BaseTableDataManager implements TableDataManager { protected long _streamSegmentDownloadUntarRateLimitBytesPerSec; protected boolean _isStreamSegmentDownloadUntar; protected SegmentPreprocessThrottler _segmentPreprocessThrottler; + protected PinotClusterConfigProvider _clusterConfigProvider; // Semaphore to restrict the maximum number of parallel segment downloads for a table - // TODO: Make this configurable via ZK cluster configs to avoid server restarts to update - private Semaphore _segmentDownloadSemaphore; + private SegmentDownloadThrottler _segmentDownloadSemaphore; // Fixed size LRU cache with TableName - SegmentName pair as key, and segment related errors as the value. protected Cache, SegmentErrorInfo> _errorCache; @@ -142,7 +143,8 @@ public abstract class BaseTableDataManager implements TableDataManager { public void init(InstanceDataManagerConfig instanceDataManagerConfig, HelixManager helixManager, SegmentLocks segmentLocks, TableConfig tableConfig, @Nullable ExecutorService segmentPreloadExecutor, @Nullable Cache, SegmentErrorInfo> errorCache, - @Nullable SegmentPreprocessThrottler segmentPreprocessThrottler) { + @Nullable SegmentPreprocessThrottler segmentPreprocessThrottler, + @Nullable PinotClusterConfigProvider clusterConfigProvider) { LOGGER.info("Initializing table data manager for table: {}", tableConfig.getTableName()); _instanceDataManagerConfig = instanceDataManagerConfig; @@ -171,6 +173,7 @@ public void init(InstanceDataManagerConfig instanceDataManagerConfig, HelixManag } _errorCache = errorCache; _segmentPreprocessThrottler = segmentPreprocessThrottler; + _clusterConfigProvider = clusterConfigProvider; _recentlyDeletedSegments = CacheBuilder.newBuilder().maximumSize(instanceDataManagerConfig.getDeletedSegmentsCacheSize()) .expireAfterWrite(instanceDataManagerConfig.getDeletedSegmentsCacheTtlMinutes(), TimeUnit.MINUTES).build(); @@ -196,14 +199,23 @@ public void init(InstanceDataManagerConfig instanceDataManagerConfig, HelixManag _streamSegmentDownloadUntarRateLimitBytesPerSec); } int maxParallelSegmentDownloads = instanceDataManagerConfig.getMaxParallelSegmentDownloads(); - if (maxParallelSegmentDownloads > 0) { - LOGGER.info( - "Construct segment download semaphore for Table: {}. Maximum number of parallel segment downloads: {}", - _tableNameWithType, maxParallelSegmentDownloads); - _segmentDownloadSemaphore = new Semaphore(maxParallelSegmentDownloads, true); - } else { - _segmentDownloadSemaphore = null; + // For backward compatibility, check if the value is negative and set it to the default in that case + // Earlier this config could be negative, in which case download throttling was completely disabled + if (maxParallelSegmentDownloads <= 0) { + LOGGER.warn("Segment download config: {} found to be <= 0: value {}, resetting it to default of: {}", + CommonConstants.Server.CONFIG_OF_MAX_PARALLEL_SEGMENT_DOWNLOADS, maxParallelSegmentDownloads, + CommonConstants.Server.DEFAULT_MAX_PARALLEL_SEGMENT_DOWNLOADS); + maxParallelSegmentDownloads = Integer.parseInt(CommonConstants.Server.DEFAULT_MAX_PARALLEL_SEGMENT_DOWNLOADS); + } + LOGGER.info( + "Construct segment download semaphore for Table: {}. Maximum number of parallel segment downloads: {}", + _tableNameWithType, maxParallelSegmentDownloads); + _segmentDownloadSemaphore = new SegmentDownloadThrottler(maxParallelSegmentDownloads, _tableNameWithType); + // If the cluster config provider exists, register the download semaphore to get dynamic config updates + if (_clusterConfigProvider != null) { + _clusterConfigProvider.registerClusterConfigChangeListener(_segmentDownloadSemaphore); } + _logger = LoggerFactory.getLogger(_tableNameWithType + "-" + getClass().getSimpleName()); doInit(); @@ -806,14 +818,12 @@ protected File downloadSegmentFromDeepStore(SegmentZKMetadata zkMetadata) String downloadUrl = zkMetadata.getDownloadUrl(); _logger.info("Downloading segment: {} from: {}", segmentName, downloadUrl); File tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "-" + UUID.randomUUID()); - if (_segmentDownloadSemaphore != null) { - long startTime = System.currentTimeMillis(); - _logger.info("Acquiring segment download semaphore for segment: {}, queue-length: {} ", segmentName, - _segmentDownloadSemaphore.getQueueLength()); - _segmentDownloadSemaphore.acquire(); - _logger.info("Acquired segment download semaphore for segment: {} (lock-time={}ms, queue-length={}).", - segmentName, System.currentTimeMillis() - startTime, _segmentDownloadSemaphore.getQueueLength()); - } + long startTime = System.currentTimeMillis(); + _logger.info("Acquiring segment download semaphore for segment: {}, queue-length: {} ", segmentName, + _segmentDownloadSemaphore.getQueueLength()); + _segmentDownloadSemaphore.acquire(); + _logger.info("Acquired segment download semaphore for segment: {} (lock-time={}ms, queue-length={}).", + segmentName, System.currentTimeMillis() - startTime, _segmentDownloadSemaphore.getQueueLength()); try { File untarredSegmentDir; if (_isStreamSegmentDownloadUntar && zkMetadata.getCrypterName() == null) { @@ -843,9 +853,7 @@ protected File downloadSegmentFromDeepStore(SegmentZKMetadata zkMetadata) _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.SEGMENT_DOWNLOAD_FROM_REMOTE_FAILURES, 1); throw e; } finally { - if (_segmentDownloadSemaphore != null) { - _segmentDownloadSemaphore.release(); - } + _segmentDownloadSemaphore.release(); FileUtils.deleteQuietly(tempRootDir); } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java index e9ba28395516..4f7ec3e68414 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java @@ -34,6 +34,7 @@ import org.apache.pinot.segment.local.utils.SegmentPreprocessThrottler; import org.apache.pinot.segment.spi.SegmentMetadata; import org.apache.pinot.spi.annotations.InterfaceAudience; +import org.apache.pinot.spi.config.provider.PinotClusterConfigProvider; import org.apache.pinot.spi.env.PinotConfiguration; @@ -51,7 +52,8 @@ public interface InstanceDataManager { *

NOTE: The config is the subset of server config with prefix 'pinot.server.instance' */ void init(PinotConfiguration config, HelixManager helixManager, ServerMetrics serverMetrics, - @Nullable SegmentPreprocessThrottler segmentPreprocessThrottler) + @Nullable SegmentPreprocessThrottler segmentPreprocessThrottler, + @Nullable PinotClusterConfigProvider clusterConfigProvider) throws Exception; /** diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java index 9da08dd85ca0..286e1a034790 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java @@ -35,6 +35,7 @@ import org.apache.pinot.segment.local.utils.SegmentLocks; import org.apache.pinot.segment.local.utils.SegmentPreprocessThrottler; import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig; +import org.apache.pinot.spi.config.provider.PinotClusterConfigProvider; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.stream.StreamConfigProperties; import org.apache.pinot.spi.utils.CommonConstants; @@ -50,16 +51,19 @@ public class DefaultTableDataManagerProvider implements TableDataManagerProvider private SegmentLocks _segmentLocks; private Semaphore _segmentBuildSemaphore; private SegmentPreprocessThrottler _segmentPreprocessThrottler; + private PinotClusterConfigProvider _clusterConfigProvider; @Override public void init(InstanceDataManagerConfig instanceDataManagerConfig, HelixManager helixManager, - SegmentLocks segmentLocks, @Nullable SegmentPreprocessThrottler segmentPreprocessThrottler) { + SegmentLocks segmentLocks, @Nullable SegmentPreprocessThrottler segmentPreprocessThrottler, + @Nullable PinotClusterConfigProvider clusterConfigProvider) { _instanceDataManagerConfig = instanceDataManagerConfig; _helixManager = helixManager; _segmentLocks = segmentLocks; int maxParallelSegmentBuilds = instanceDataManagerConfig.getMaxParallelSegmentBuilds(); _segmentBuildSemaphore = maxParallelSegmentBuilds > 0 ? new Semaphore(maxParallelSegmentBuilds, true) : null; _segmentPreprocessThrottler = segmentPreprocessThrottler; + _clusterConfigProvider = clusterConfigProvider; } @Override @@ -89,7 +93,7 @@ public TableDataManager getTableDataManager(TableConfig tableConfig, @Nullable E throw new IllegalStateException(); } tableDataManager.init(_instanceDataManagerConfig, _helixManager, _segmentLocks, tableConfig, segmentPreloadExecutor, - errorCache, _segmentPreprocessThrottler); + errorCache, _segmentPreprocessThrottler, _clusterConfigProvider); return tableDataManager; } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java index 95d0ad34d625..2958bdc599d9 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java @@ -30,6 +30,7 @@ import org.apache.pinot.segment.local.utils.SegmentPreprocessThrottler; import org.apache.pinot.spi.annotations.InterfaceAudience; import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig; +import org.apache.pinot.spi.config.provider.PinotClusterConfigProvider; import org.apache.pinot.spi.config.table.TableConfig; @@ -40,7 +41,8 @@ public interface TableDataManagerProvider { void init(InstanceDataManagerConfig instanceDataManagerConfig, HelixManager helixManager, SegmentLocks segmentLocks, - @Nullable SegmentPreprocessThrottler segmentPreprocessThrottler); + @Nullable SegmentPreprocessThrottler segmentPreprocessThrottler, + @Nullable PinotClusterConfigProvider clusterConfigProvider); default TableDataManager getTableDataManager(TableConfig tableConfig) { return getTableDataManager(tableConfig, null, null, () -> true); diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java index 93c490fd4f31..9ab873bf589c 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java @@ -124,7 +124,7 @@ private TableDataManager makeTestableManager() new SegmentAllIndexPreprocessThrottler(8, 10, true), new SegmentStarTreePreprocessThrottler(4, 8, true)); TableDataManager tableDataManager = new OfflineTableDataManager(); tableDataManager.init(instanceDataManagerConfig, mock(HelixManager.class), new SegmentLocks(), tableConfig, null, - null, segmentPreprocessThrottler); + null, segmentPreprocessThrottler, null); tableDataManager.start(); Field segsMapField = BaseTableDataManager.class.getDeclaredField("_segmentDataManagerMap"); segsMapField.setAccessible(true); diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java index bff09b7ea81e..f10bdea27aa0 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java @@ -662,7 +662,7 @@ private static OfflineTableDataManager createTableManager(InstanceDataManagerCon SegmentLocks segmentLocks = new SegmentLocks(); OfflineTableDataManager tableDataManager = new OfflineTableDataManager(); tableDataManager.init(instanceDataManagerConfig, helixManager, segmentLocks, DEFAULT_TABLE_CONFIG, null, null, - SEGMENT_PREPROCESS_THROTTLER); + SEGMENT_PREPROCESS_THROTTLER, null); return tableDataManager; } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java index db8053519238..2d3b54aa6628 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java @@ -152,7 +152,7 @@ private DimensionTableDataManager makeTableDataManager(HelixManager helixManager DimensionTableDataManager tableDataManager = DimensionTableDataManager.createInstanceByTableName(OFFLINE_TABLE_NAME); tableDataManager.init(instanceDataManagerConfig, helixManager, new SegmentLocks(), tableConfig, null, null, - SEGMENT_PREPROCESS_THROTTLER); + SEGMENT_PREPROCESS_THROTTLER, null); tableDataManager.start(); return tableDataManager; } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java index 4d63fa455f18..c9a362c6108e 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java @@ -794,7 +794,7 @@ public void testShutdownTableDataManagerWillNotShutdownLeaseExtenderExecutor() InstanceDataManagerConfig instanceDataManagerConfig = mock(InstanceDataManagerConfig.class); when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath()); TableDataManagerProvider tableDataManagerProvider = new DefaultTableDataManagerProvider(); - tableDataManagerProvider.init(instanceDataManagerConfig, helixManager, new SegmentLocks(), null); + tableDataManagerProvider.init(instanceDataManagerConfig, helixManager, new SegmentLocks(), null, null); TableDataManager tableDataManager = tableDataManagerProvider.getTableDataManager(tableConfig); tableDataManager.start(); tableDataManager.shutDown(); diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java index 509f18ea8569..ec7d441cc6b0 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java @@ -135,7 +135,7 @@ public void setUp() InstanceDataManagerConfig instanceDataManagerConfig = mock(InstanceDataManagerConfig.class); when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(INDEX_DIR.getAbsolutePath()); TableDataManagerProvider tableDataManagerProvider = new DefaultTableDataManagerProvider(); - tableDataManagerProvider.init(instanceDataManagerConfig, mock(HelixManager.class), new SegmentLocks(), null); + tableDataManagerProvider.init(instanceDataManagerConfig, mock(HelixManager.class), new SegmentLocks(), null, null); TableDataManager tableDataManager = tableDataManagerProvider.getTableDataManager(tableConfig); tableDataManager.start(); //we don't add index segments to the data manager to simulate numSegmentsAcquired < numSegmentsQueried diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java index 04a908e697d1..b732b51bc134 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java @@ -160,7 +160,7 @@ public void setUp() InstanceDataManagerConfig instanceDataManagerConfig = mock(InstanceDataManagerConfig.class); when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath()); TableDataManagerProvider tableDataManagerProvider = new DefaultTableDataManagerProvider(); - tableDataManagerProvider.init(instanceDataManagerConfig, mock(HelixManager.class), new SegmentLocks(), null); + tableDataManagerProvider.init(instanceDataManagerConfig, mock(HelixManager.class), new SegmentLocks(), null, null); TableDataManager tableDataManager = tableDataManagerProvider.getTableDataManager(tableConfig); tableDataManager.start(); for (ImmutableSegment indexSegment : _indexSegments) { diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java index 958791f93f2c..1144969d4a0e 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java @@ -278,7 +278,7 @@ public void setUp() InstanceDataManagerConfig instanceDataManagerConfig = mock(InstanceDataManagerConfig.class); when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath()); TableDataManagerProvider tableDataManagerProvider = new DefaultTableDataManagerProvider(); - tableDataManagerProvider.init(instanceDataManagerConfig, mock(HelixManager.class), new SegmentLocks(), null); + tableDataManagerProvider.init(instanceDataManagerConfig, mock(HelixManager.class), new SegmentLocks(), null, null); TableDataManager tableDataManager = tableDataManagerProvider.getTableDataManager(TABLE_CONFIG); tableDataManager.start(); for (IndexSegment indexSegment : _indexSegments) { diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/SegmentWithNullValueVectorTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/SegmentWithNullValueVectorTest.java index 1c509ca5e8c8..0dce6ea133f3 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/SegmentWithNullValueVectorTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/SegmentWithNullValueVectorTest.java @@ -140,7 +140,7 @@ public void setup() InstanceDataManagerConfig instanceDataManagerConfig = mock(InstanceDataManagerConfig.class); when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath()); TableDataManagerProvider tableDataManagerProvider = new DefaultTableDataManagerProvider(); - tableDataManagerProvider.init(instanceDataManagerConfig, mock(HelixManager.class), new SegmentLocks(), null); + tableDataManagerProvider.init(instanceDataManagerConfig, mock(HelixManager.class), new SegmentLocks(), null, null); TableDataManager tableDataManager = tableDataManagerProvider.getTableDataManager(tableConfig); tableDataManager.start(); tableDataManager.addSegment(_segment); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java index f2e6ed6f3ad7..012fed5c5986 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java @@ -39,6 +39,7 @@ import org.apache.pinot.segment.spi.SegmentContext; import org.apache.pinot.segment.spi.SegmentMetadata; import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig; +import org.apache.pinot.spi.config.provider.PinotClusterConfigProvider; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.Schema; @@ -55,7 +56,8 @@ public interface TableDataManager { void init(InstanceDataManagerConfig instanceDataManagerConfig, HelixManager helixManager, SegmentLocks segmentLocks, TableConfig tableConfig, @Nullable ExecutorService segmentPreloadExecutor, @Nullable Cache, SegmentErrorInfo> errorCache, - @Nullable SegmentPreprocessThrottler segmentPreprocessThrottler); + @Nullable SegmentPreprocessThrottler segmentPreprocessThrottler, + @Nullable PinotClusterConfigProvider clusterConfigProvider); /** * Returns the instance id of the server. diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentDownloadThrottler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentDownloadThrottler.java new file mode 100644 index 000000000000..6bb11dc676dd --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentDownloadThrottler.java @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.utils; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import java.util.Map; +import java.util.Set; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.pinot.common.concurrency.AdjustableSemaphore; +import org.apache.pinot.spi.config.provider.PinotClusterConfigChangeListener; +import org.apache.pinot.spi.utils.CommonConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Used to throttle the total concurrent deep store downloads that can happen on a given Pinot server. + */ +public class SegmentDownloadThrottler implements PinotClusterConfigChangeListener { + private static final Logger LOGGER = LoggerFactory.getLogger(SegmentDownloadThrottler.class); + + /** + * _maxDownloadConcurrency must be > 0. To effectively disable throttling, this can be set to a very high value + */ + private int _maxDownloadConcurrency; + private final String _tableNameWithType; + private final AdjustableSemaphore _semaphore; + + /** + * @param maxDownloadConcurrency configured download concurrency + */ + public SegmentDownloadThrottler(int maxDownloadConcurrency, String tableNameWithType) { + LOGGER.info("Initializing SegmentDownloadThrottler, maxDownloadConcurrency: {}", maxDownloadConcurrency); + Preconditions.checkArgument(maxDownloadConcurrency > 0, + "Max download parallelism must be > 0, but found to be: " + maxDownloadConcurrency); + + _maxDownloadConcurrency = maxDownloadConcurrency; + _tableNameWithType = tableNameWithType; + + _semaphore = new AdjustableSemaphore(_maxDownloadConcurrency, true); + LOGGER.info("Created download semaphore for table: {} with total permits: {}, available permits: {}", + _tableNameWithType, totalPermits(), availablePermits()); + } + + @Override + public void onChange(Set changedConfigs, Map clusterConfigs) { + if (CollectionUtils.isEmpty(changedConfigs)) { + LOGGER.info("Skip updating SegmentDownloadThrottler configs for table: {} with unchanged clusterConfigs", + _tableNameWithType); + return; + } + + LOGGER.info("Updating SegmentDownloadThrottler configs for table: {} with latest clusterConfigs", + _tableNameWithType); + if (!changedConfigs.contains(CommonConstants.Server.CONFIG_OF_MAX_PARALLEL_SEGMENT_DOWNLOADS)) { + LOGGER.info("changedConfigs list indicates config: {} for table: {} was not updated, skipping updates", + CommonConstants.Server.CONFIG_OF_MAX_PARALLEL_SEGMENT_DOWNLOADS, _tableNameWithType); + return; + } + + String configName = CommonConstants.Server.CONFIG_OF_MAX_PARALLEL_SEGMENT_DOWNLOADS; + String defaultConfigValue = CommonConstants.Server.DEFAULT_MAX_PARALLEL_SEGMENT_DOWNLOADS; + String maxParallelSegmentDownloadsStr = + clusterConfigs == null ? defaultConfigValue : clusterConfigs.getOrDefault(configName, defaultConfigValue); + + int maxDownloadConcurrency; + try { + maxDownloadConcurrency = Integer.parseInt(maxParallelSegmentDownloadsStr); + } catch (Exception e) { + LOGGER.warn("Invalid config: {} for table: {} set to: {}, not making change, fix config and try again", + CommonConstants.Server.CONFIG_OF_MAX_PARALLEL_SEGMENT_DOWNLOADS, _tableNameWithType, + maxParallelSegmentDownloadsStr); + return; + } + + if (maxDownloadConcurrency <= 0) { + LOGGER.warn("config {} for table: {}: {} must be > 0, not making change, fix config and try again", + CommonConstants.Server.CONFIG_OF_MAX_PARALLEL_SEGMENT_DOWNLOADS, _tableNameWithType, maxDownloadConcurrency); + return; + } + + if (maxDownloadConcurrency == _maxDownloadConcurrency) { + LOGGER.info("No ZK update for config: {} for table: {} value: {}, total permits: {}", + CommonConstants.Server.CONFIG_OF_MAX_PARALLEL_SEGMENT_DOWNLOADS, _tableNameWithType, _maxDownloadConcurrency, + totalPermits()); + return; + } + + LOGGER.info("Updated config: {} for table: {} from: {} to: {}", + CommonConstants.Server.CONFIG_OF_MAX_PARALLEL_SEGMENT_DOWNLOADS, _tableNameWithType, _maxDownloadConcurrency, + maxDownloadConcurrency); + _maxDownloadConcurrency = maxDownloadConcurrency; + + _semaphore.setPermits(_maxDownloadConcurrency); + LOGGER.info("Updated total table download permits: {}", totalPermits()); + LOGGER.info("Updated SegmentDownloadThrottler configs for table: {} with latest clusterConfigs", + _tableNameWithType); + } + + /** + * Block trying to acquire the semaphore to perform the segment index rebuild steps unless interrupted. + *

+ * {@link #release()} should be called after the segment download completes. It is the responsibility of the caller + * to ensure that {@link #release()} is called exactly once for each call to this method. + * + * @throws InterruptedException if the current thread is interrupted + */ + public void acquire() + throws InterruptedException { + _semaphore.acquire(); + } + + /** + * Should be called after the segment index build completes. It is the responsibility of the caller to ensure that + * this method is called exactly once for each call to {@link #acquire()}. + */ + public void release() { + _semaphore.release(); + } + + /** + * Get the estimated number of threads waiting for the semaphore + * @return the estimated number of threads waiting for the semaphore + */ + public int getQueueLength() { + return _semaphore.getQueueLength(); + } + + @VisibleForTesting + int availablePermits() { + return _semaphore.availablePermits(); + } + + @VisibleForTesting + int totalPermits() { + return _semaphore.getTotalPermits(); + } +} diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/SegmentDownloadThrottlerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/SegmentDownloadThrottlerTest.java new file mode 100644 index 000000000000..a626c565a58e --- /dev/null +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/SegmentDownloadThrottlerTest.java @@ -0,0 +1,276 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.utils; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import org.apache.pinot.spi.utils.CommonConstants; +import org.testng.Assert; +import org.testng.annotations.Test; + +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.spy; + + +public class SegmentDownloadThrottlerTest { + private final static String TABLE_NAME_WITH_TYPE = "test_OFFLINE"; + + @Test + public void testBasicAcquireRelease() + throws Exception { + SegmentDownloadThrottler downloadThrottler = new SegmentDownloadThrottler(4, TABLE_NAME_WITH_TYPE); + Assert.assertEquals(downloadThrottler.availablePermits(), 4); + Assert.assertEquals(downloadThrottler.totalPermits(), 4); + + downloadThrottler.acquire(); + Assert.assertEquals(downloadThrottler.availablePermits(), 3); + Assert.assertEquals(downloadThrottler.totalPermits(), 4); + + downloadThrottler.release(); + Assert.assertEquals(downloadThrottler.availablePermits(), 4); + Assert.assertEquals(downloadThrottler.totalPermits(), 4); + } + + @Test + public void testBasicAcquireAllPermits() + throws Exception { + int totalPermits = 4; + SegmentDownloadThrottler downloadThrottler = new SegmentDownloadThrottler(totalPermits, TABLE_NAME_WITH_TYPE); + Assert.assertEquals(downloadThrottler.totalPermits(), totalPermits); + + for (int i = 0; i < totalPermits; i++) { + downloadThrottler.acquire(); + Assert.assertEquals(downloadThrottler.availablePermits(), totalPermits - i - 1); + Assert.assertEquals(downloadThrottler.totalPermits(), totalPermits); + } + for (int i = 0; i < totalPermits; i++) { + downloadThrottler.release(); + Assert.assertEquals(downloadThrottler.availablePermits(), i + 1); + Assert.assertEquals(downloadThrottler.totalPermits(), totalPermits); + } + } + + @Test + public void testThrowExceptionOnSettingInvalidConfigValues() { + Assert.assertThrows(IllegalArgumentException.class, () -> new SegmentDownloadThrottler(-1, TABLE_NAME_WITH_TYPE)); + Assert.assertThrows(IllegalArgumentException.class, () -> new SegmentDownloadThrottler(0, TABLE_NAME_WITH_TYPE)); + } + + @Test + public void testDisabledThrottlingBySettingDefault() + throws Exception { + // Default should be quite high. Should be able to essentially acquire as many permits as wanted + int defaultPermits = Integer.parseInt(CommonConstants.Server.DEFAULT_MAX_PARALLEL_SEGMENT_DOWNLOADS); + SegmentDownloadThrottler downloadThrottler = new SegmentDownloadThrottler(defaultPermits, TABLE_NAME_WITH_TYPE); + Assert.assertEquals(downloadThrottler.totalPermits(), defaultPermits); + + Assert.assertEquals(downloadThrottler.availablePermits(), defaultPermits); + for (int i = 0; i < defaultPermits; i++) { + downloadThrottler.acquire(); + Assert.assertEquals(downloadThrottler.totalPermits(), defaultPermits); + Assert.assertEquals(downloadThrottler.availablePermits(), defaultPermits - i - 1); + } + } + + @Test + public void testPositiveToNegativeThrottleChange() { + int initialPermits = 2; + SegmentDownloadThrottler downloadThrottler = new SegmentDownloadThrottler(initialPermits, TABLE_NAME_WITH_TYPE); + Assert.assertEquals(downloadThrottler.totalPermits(), initialPermits); + Assert.assertEquals(downloadThrottler.availablePermits(), initialPermits); + + // Change the value of cluster config for max segment preprocess parallelism to be a negative value + // If maxConcurrentDownload is <= 0, this is an invalid configuration change. Do nothing other than log a warning + Map updatedClusterConfigs = new HashMap<>(); + updatedClusterConfigs.put(CommonConstants.Server.CONFIG_OF_MAX_PARALLEL_SEGMENT_DOWNLOADS, "-1"); + downloadThrottler.onChange(updatedClusterConfigs.keySet(), updatedClusterConfigs); + + Assert.assertEquals(downloadThrottler.totalPermits(), initialPermits); + Assert.assertEquals(downloadThrottler.availablePermits(), initialPermits); + + updatedClusterConfigs.put(CommonConstants.Server.CONFIG_OF_MAX_PARALLEL_SEGMENT_DOWNLOADS, "0"); + downloadThrottler.onChange(updatedClusterConfigs.keySet(), updatedClusterConfigs); + + Assert.assertEquals(downloadThrottler.totalPermits(), initialPermits); + Assert.assertEquals(downloadThrottler.availablePermits(), initialPermits); + } + + @Test + public void testIncreaseSegmentPreprocessParallelism() + throws Exception { + int initialPermits = 4; + SegmentDownloadThrottler downloadThrottler = new SegmentDownloadThrottler(initialPermits, TABLE_NAME_WITH_TYPE); + Assert.assertEquals(downloadThrottler.totalPermits(), initialPermits); + + for (int i = 0; i < initialPermits; i++) { + downloadThrottler.acquire(); + } + Assert.assertEquals(downloadThrottler.totalPermits(), initialPermits); + Assert.assertEquals(downloadThrottler.availablePermits(), 0); + + // Increase the value of cluster config for max segment preprocess parallelism + Map updatedClusterConfigs = new HashMap<>(); + updatedClusterConfigs.put(CommonConstants.Server.CONFIG_OF_MAX_PARALLEL_SEGMENT_DOWNLOADS, + String.valueOf(initialPermits * 2)); + downloadThrottler.onChange(updatedClusterConfigs.keySet(), updatedClusterConfigs); + Assert.assertEquals(downloadThrottler.totalPermits(), initialPermits * 2); + + Assert.assertEquals(downloadThrottler.availablePermits(), initialPermits); + for (int i = 0; i < initialPermits; i++) { + downloadThrottler.acquire(); + } + Assert.assertEquals(downloadThrottler.totalPermits(), initialPermits * 2); + Assert.assertEquals(downloadThrottler.availablePermits(), 0); + for (int i = 0; i < (initialPermits * 2); i++) { + downloadThrottler.release(); + } + Assert.assertEquals(downloadThrottler.totalPermits(), initialPermits * 2); + Assert.assertEquals(downloadThrottler.availablePermits(), initialPermits * 2); + } + + @Test + public void testDecreaseSegmentPreprocessParallelism() + throws Exception { + int initialPermits = 4; + SegmentDownloadThrottler downloadThrottler = new SegmentDownloadThrottler(initialPermits, TABLE_NAME_WITH_TYPE); + Assert.assertEquals(downloadThrottler.totalPermits(), initialPermits); + + for (int i = 0; i < initialPermits; i++) { + downloadThrottler.acquire(); + } + Assert.assertEquals(downloadThrottler.totalPermits(), initialPermits); + Assert.assertEquals(downloadThrottler.availablePermits(), 0); + + // Increase the value of cluster config for max segment preprocess parallelism + Map updatedClusterConfigs = new HashMap<>(); + updatedClusterConfigs.put(CommonConstants.Server.CONFIG_OF_MAX_PARALLEL_SEGMENT_DOWNLOADS, + String.valueOf(initialPermits / 2)); + downloadThrottler.onChange(updatedClusterConfigs.keySet(), updatedClusterConfigs); + Assert.assertEquals(downloadThrottler.totalPermits(), initialPermits / 2); + + Assert.assertEquals(downloadThrottler.availablePermits(), -(initialPermits / 2)); + for (int i = 0; i < 4; i++) { + downloadThrottler.release(); + } + Assert.assertEquals(downloadThrottler.totalPermits(), initialPermits / 2); + Assert.assertEquals(downloadThrottler.availablePermits(), initialPermits / 2); + } + + @Test + public void testThrowException() + throws Exception { + SegmentDownloadThrottler downloadThrottler = new SegmentDownloadThrottler(1, TABLE_NAME_WITH_TYPE); + SegmentDownloadThrottler spy = spy(downloadThrottler); + spy.acquire(); + Assert.assertEquals(spy.availablePermits(), 0); + doThrow(new InterruptedException("interrupt")).when(spy).acquire(); + + Assert.assertThrows(InterruptedException.class, spy::acquire); + Assert.assertEquals(spy.availablePermits(), 0); + spy.release(); + Assert.assertEquals(spy.availablePermits(), 1); + } + + @Test + public void testChangeConfigsEmpty() { + int initialPermits = 4; + SegmentDownloadThrottler downloadThrottler = new SegmentDownloadThrottler(initialPermits, TABLE_NAME_WITH_TYPE); + Assert.assertEquals(downloadThrottler.totalPermits(), initialPermits); + + // Pass empty configs and keyset + Map updatedClusterConfigs = new HashMap<>(); + downloadThrottler.onChange(updatedClusterConfigs.keySet(), updatedClusterConfigs); + Assert.assertEquals(downloadThrottler.totalPermits(), initialPermits); + } + + @Test + public void testChangeConfigDeletedConfigsEmpty() { + int initialPermits = 4; + SegmentDownloadThrottler downloadThrottler = new SegmentDownloadThrottler(initialPermits, TABLE_NAME_WITH_TYPE); + Assert.assertEquals(downloadThrottler.totalPermits(), initialPermits); + + // Create a set of valid keys and pass clusterConfigs as null, the config should reset to the default + Set keys = new HashSet<>(); + keys.add(CommonConstants.Server.CONFIG_OF_MAX_PARALLEL_SEGMENT_DOWNLOADS); + downloadThrottler.onChange(keys, null); + Assert.assertEquals(downloadThrottler.totalPermits(), + Integer.parseInt(CommonConstants.Server.DEFAULT_MAX_PARALLEL_SEGMENT_DOWNLOADS)); + } + + @Test + public void testChangeConfigsOtherThanRelevant() { + int initialPermits = 4; + SegmentDownloadThrottler downloadThrottler = new SegmentDownloadThrottler(initialPermits, TABLE_NAME_WITH_TYPE); + Assert.assertEquals(downloadThrottler.totalPermits(), initialPermits); + + // Add some random configs and call 'onChange' + Map updatedClusterConfigs = new HashMap<>(); + updatedClusterConfigs.put("random.config.key", "random.config.value"); + downloadThrottler.onChange(updatedClusterConfigs.keySet(), updatedClusterConfigs); + Assert.assertEquals(downloadThrottler.totalPermits(), initialPermits); + } + + @Test + public void testChangeConfigs() { + int initialPermits = 4; + SegmentDownloadThrottler downloadThrottler = new SegmentDownloadThrottler(initialPermits, TABLE_NAME_WITH_TYPE); + Assert.assertEquals(downloadThrottler.totalPermits(), initialPermits); + + // Add relevant and random configs and call 'onChange' + Map updatedClusterConfigs = new HashMap<>(); + // Random config + updatedClusterConfigs.put("random.config.key", "random.config.value"); + // Config not relevant for download config + updatedClusterConfigs.put(CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM, + String.valueOf(initialPermits * 2)); + // Relevant config + updatedClusterConfigs.put(CommonConstants.Server.CONFIG_OF_MAX_PARALLEL_SEGMENT_DOWNLOADS, + String.valueOf(initialPermits * 4)); + downloadThrottler.onChange(updatedClusterConfigs.keySet(), updatedClusterConfigs); + Assert.assertEquals(downloadThrottler.totalPermits(), initialPermits * 4); + } + + @Test + public void testChangeConfigsTwoTables() { + int initialPermits = 4; + // Each table will have its own download throttler. For now they all take the same configuration as input + SegmentDownloadThrottler downloadThrottler = new SegmentDownloadThrottler(initialPermits, TABLE_NAME_WITH_TYPE); + SegmentDownloadThrottler downloadThrottlerTable2 = new SegmentDownloadThrottler(initialPermits, "test_REALTIME"); + Assert.assertEquals(downloadThrottler.totalPermits(), initialPermits); + Assert.assertEquals(downloadThrottlerTable2.totalPermits(), initialPermits); + + // Add relevant and random configs and call 'onChange' + Map updatedClusterConfigs = new HashMap<>(); + // Random config + updatedClusterConfigs.put("random.config.key", "random.config.value"); + // Config not relevant for download config + updatedClusterConfigs.put(CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM, + String.valueOf(initialPermits * 2)); + // Relevant config + updatedClusterConfigs.put(CommonConstants.Server.CONFIG_OF_MAX_PARALLEL_SEGMENT_DOWNLOADS, + String.valueOf(initialPermits * 4)); + downloadThrottler.onChange(updatedClusterConfigs.keySet(), updatedClusterConfigs); + downloadThrottlerTable2.onChange(updatedClusterConfigs.keySet(), updatedClusterConfigs); + + Assert.assertEquals(downloadThrottler.totalPermits(), initialPermits * 4); + Assert.assertEquals(downloadThrottlerTable2.totalPermits(), initialPermits * 4); + } +} diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java index f8e2190b6c88..8b92fd9dce64 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java @@ -50,6 +50,7 @@ import org.apache.pinot.server.access.AllowAllAccessFactory; import org.apache.pinot.server.conf.ServerConf; import org.apache.pinot.server.worker.WorkerQueryServer; +import org.apache.pinot.spi.config.provider.PinotClusterConfigProvider; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.metrics.PinotMetricUtils; import org.apache.pinot.spi.metrics.PinotMetricsRegistry; @@ -76,7 +77,6 @@ public class ServerInstance { private final GrpcQueryServer _grpcQueryServer; private final AccessControl _accessControl; private final HelixManager _helixManager; - private final SegmentPreprocessThrottler _segmentPreprocessThrottler; private final WorkerQueryServer _workerQueryServer; private ChannelHandler _instanceRequestHandler; @@ -85,7 +85,8 @@ public class ServerInstance { private boolean _queryServerStarted = false; public ServerInstance(ServerConf serverConf, HelixManager helixManager, AccessControlFactory accessControlFactory, - @Nullable SegmentPreprocessThrottler segmentPreprocessThrottler) + @Nullable SegmentPreprocessThrottler segmentPreprocessThrottler, + @Nullable PinotClusterConfigProvider clusterConfigProvider) throws Exception { LOGGER.info("Initializing server instance"); _helixManager = helixManager; @@ -99,13 +100,11 @@ public ServerInstance(ServerConf serverConf, HelixManager helixManager, AccessCo _serverMetrics.setValueOfGlobalGauge(ServerGauge.VERSION, PinotVersion.VERSION_METRIC_NAME, 1); ServerMetrics.register(_serverMetrics); - _segmentPreprocessThrottler = segmentPreprocessThrottler; - String instanceDataManagerClassName = serverConf.getInstanceDataManagerClassName(); LOGGER.info("Initializing instance data manager of class: {}", instanceDataManagerClassName); _instanceDataManager = PluginManager.get().createInstance(instanceDataManagerClassName); _instanceDataManager.init(serverConf.getInstanceDataManagerConfig(), helixManager, _serverMetrics, - _segmentPreprocessThrottler); + segmentPreprocessThrottler, clusterConfigProvider); // Initialize ServerQueryLogger and FunctionRegistry before starting the query executor ServerQueryLogger.init(serverConf.getQueryLogMaxRate(), serverConf.getQueryLogDroppedReportMaxRate(), diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java index c6ba62f8e77f..993104befb6a 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java @@ -653,7 +653,8 @@ public void start() new SegmentPreprocessThrottler(segmentAllIndexPreprocessThrottler, segmentStarTreePreprocessThrottler); ServerConf serverConf = new ServerConf(_serverConf); - _serverInstance = new ServerInstance(serverConf, _helixManager, _accessControlFactory, _segmentPreprocessThrottler); + _serverInstance = new ServerInstance(serverConf, _helixManager, _accessControlFactory, _segmentPreprocessThrottler, + _clusterConfigChangeHandler); ServerMetrics serverMetrics = _serverInstance.getServerMetrics(); InstanceDataManager instanceDataManager = _serverInstance.getInstanceDataManager(); diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java index 230871d4fcd1..0c8f3d7e9ac2 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java @@ -65,6 +65,7 @@ import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext; import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry; import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler; +import org.apache.pinot.spi.config.provider.PinotClusterConfigProvider; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.env.PinotConfiguration; @@ -93,7 +94,6 @@ public class HelixInstanceDataManager implements InstanceDataManager { private ZkHelixPropertyStore _propertyStore; private SegmentUploader _segmentUploader; private Supplier _isServerReadyToServeQueries = () -> false; - private SegmentPreprocessThrottler _segmentPreprocessThrottler; // Fixed size LRU cache for storing last N errors on the instance. // Key is TableNameWithType-SegmentName pair. @@ -112,7 +112,8 @@ public void setSupplierOfIsServerReadyToServeQueries(Supplier isServing @Override public synchronized void init(PinotConfiguration config, HelixManager helixManager, ServerMetrics serverMetrics, - @Nullable SegmentPreprocessThrottler segmentPreprocessThrottler) + @Nullable SegmentPreprocessThrottler segmentPreprocessThrottler, + @Nullable PinotClusterConfigProvider clusterConfigProvider) throws Exception { LOGGER.info("Initializing Helix instance data manager"); @@ -120,12 +121,11 @@ public synchronized void init(PinotConfiguration config, HelixManager helixManag LOGGER.info("HelixInstanceDataManagerConfig: {}", _instanceDataManagerConfig.getConfig()); _instanceId = _instanceDataManagerConfig.getInstanceId(); _helixManager = helixManager; - _segmentPreprocessThrottler = segmentPreprocessThrottler; String tableDataManagerProviderClass = _instanceDataManagerConfig.getTableDataManagerProviderClass(); LOGGER.info("Initializing table data manager provider of class: {}", tableDataManagerProviderClass); _tableDataManagerProvider = PluginManager.get().createInstance(tableDataManagerProviderClass); _tableDataManagerProvider.init(_instanceDataManagerConfig, helixManager, _segmentLocks, - _segmentPreprocessThrottler); + segmentPreprocessThrottler, clusterConfigProvider); _segmentUploader = new PinotFSSegmentUploader(_instanceDataManagerConfig.getSegmentStoreUri(), ServerSegmentCompletionProtocolHandler.getSegmentUploadRequestTimeoutMs(), serverMetrics); diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java index ad190bcbde48..ae219d82b13c 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java @@ -29,6 +29,7 @@ import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry; import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig; import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.ReadMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,11 +67,12 @@ public class HelixInstanceDataManagerConfig implements InstanceDataManagerConfig private static final int DEFAULT_MAX_PARALLEL_SEGMENT_BUILDS = 4; // Key of how many parallel segment downloads can be made per table. - // A value of <= 0 indicates unlimited. + // Must have a value > 0. To effectively disable this, set to a very large value // Unlimited parallel downloads can make Pinot controllers receive high burst of download requests, // causing controllers unavailable for that period of time. - private static final String MAX_PARALLEL_SEGMENT_DOWNLOADS = "table.level.max.parallel.segment.downloads"; - private static final int DEFAULT_MAX_PARALLEL_SEGMENT_DOWNLOADS = -1; + public static final String MAX_PARALLEL_SEGMENT_DOWNLOADS = CommonConstants.Server.MAX_PARALLEL_SEGMENT_DOWNLOADS; + private static final int DEFAULT_MAX_PARALLEL_SEGMENT_DOWNLOADS = + Integer.parseInt(CommonConstants.Server.DEFAULT_MAX_PARALLEL_SEGMENT_DOWNLOADS); // Key of server segment download rate limit // limit the rate to write download-untar stream to disk, in bytes diff --git a/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java b/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java index 68ba85b4471a..471e76ef7f91 100644 --- a/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java +++ b/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java @@ -48,6 +48,7 @@ import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver; import org.apache.pinot.server.access.AllowAllAccessFactory; import org.apache.pinot.server.starter.ServerInstance; +import org.apache.pinot.server.starter.helix.DefaultClusterConfigChangeHandler; import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig; import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig; import org.apache.pinot.spi.config.table.TableConfig; @@ -202,7 +203,7 @@ protected void addTable(String tableNameWithType) { // table config. TableDataManager tableDataManager = new OfflineTableDataManager(); tableDataManager.init(instanceDataManagerConfig, mock(HelixManager.class), new SegmentLocks(), tableConfig, null, - null, null); + null, null, new DefaultClusterConfigChangeHandler()); tableDataManager.start(); _tableDataManagerMap.put(tableNameWithType, tableDataManager); } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index 4032042ca7b9..322a33d68d7a 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -741,6 +741,10 @@ public static class Server { public static final String CONFIG_OF_RELOAD_CONSUMING_SEGMENT = INSTANCE_DATA_MANAGER_CONFIG_PREFIX + "." + RELOAD_CONSUMING_SEGMENT; public static final boolean DEFAULT_RELOAD_CONSUMING_SEGMENT = true; + public static final String MAX_PARALLEL_SEGMENT_DOWNLOADS = "table.level.max.parallel.segment.downloads"; + public static final String CONFIG_OF_MAX_PARALLEL_SEGMENT_DOWNLOADS = + INSTANCE_DATA_MANAGER_CONFIG_PREFIX + "." + MAX_PARALLEL_SEGMENT_DOWNLOADS; + public static final String DEFAULT_MAX_PARALLEL_SEGMENT_DOWNLOADS = "1000"; // Query logger related configs public static final String CONFIG_OF_QUERY_LOG_MAX_RATE = "pinot.server.query.log.maxRatePerSecond";