Skip to content

Commit

Permalink
Make table.level.max.parallel.segment.downloads dynamically configura…
Browse files Browse the repository at this point in the history
…ble via ZK cluster configs.

Modify it to always be enabled, except to use a large config value as default to mimic unlimited downloads.
  • Loading branch information
somandal committed Feb 5, 2025
1 parent e9ee474 commit 5de67e3
Show file tree
Hide file tree
Showing 21 changed files with 505 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
Expand Down Expand Up @@ -71,6 +70,7 @@
import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils;
import org.apache.pinot.segment.local.startree.StarTreeBuilderUtils;
import org.apache.pinot.segment.local.startree.v2.builder.StarTreeV2BuilderConfig;
import org.apache.pinot.segment.local.utils.SegmentDownloadThrottler;
import org.apache.pinot.segment.local.utils.SegmentLocks;
import org.apache.pinot.segment.local.utils.SegmentPreprocessThrottler;
import org.apache.pinot.segment.spi.ColumnMetadata;
Expand All @@ -91,6 +91,7 @@
import org.apache.pinot.segment.spi.store.SegmentDirectory;
import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
import org.apache.pinot.spi.config.provider.PinotClusterConfigProvider;
import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
import org.apache.pinot.spi.config.table.StarTreeIndexConfig;
Expand Down Expand Up @@ -127,9 +128,9 @@ public abstract class BaseTableDataManager implements TableDataManager {
protected long _streamSegmentDownloadUntarRateLimitBytesPerSec;
protected boolean _isStreamSegmentDownloadUntar;
protected SegmentPreprocessThrottler _segmentPreprocessThrottler;
protected PinotClusterConfigProvider _clusterConfigProvider;
// Semaphore to restrict the maximum number of parallel segment downloads for a table
// TODO: Make this configurable via ZK cluster configs to avoid server restarts to update
private Semaphore _segmentDownloadSemaphore;
private SegmentDownloadThrottler _segmentDownloadSemaphore;

// Fixed size LRU cache with TableName - SegmentName pair as key, and segment related errors as the value.
protected Cache<Pair<String, String>, SegmentErrorInfo> _errorCache;
Expand All @@ -142,7 +143,8 @@ public abstract class BaseTableDataManager implements TableDataManager {
public void init(InstanceDataManagerConfig instanceDataManagerConfig, HelixManager helixManager,
SegmentLocks segmentLocks, TableConfig tableConfig, @Nullable ExecutorService segmentPreloadExecutor,
@Nullable Cache<Pair<String, String>, SegmentErrorInfo> errorCache,
@Nullable SegmentPreprocessThrottler segmentPreprocessThrottler) {
@Nullable SegmentPreprocessThrottler segmentPreprocessThrottler,
@Nullable PinotClusterConfigProvider clusterConfigProvider) {
LOGGER.info("Initializing table data manager for table: {}", tableConfig.getTableName());

_instanceDataManagerConfig = instanceDataManagerConfig;
Expand Down Expand Up @@ -171,6 +173,7 @@ public void init(InstanceDataManagerConfig instanceDataManagerConfig, HelixManag
}
_errorCache = errorCache;
_segmentPreprocessThrottler = segmentPreprocessThrottler;
_clusterConfigProvider = clusterConfigProvider;
_recentlyDeletedSegments =
CacheBuilder.newBuilder().maximumSize(instanceDataManagerConfig.getDeletedSegmentsCacheSize())
.expireAfterWrite(instanceDataManagerConfig.getDeletedSegmentsCacheTtlMinutes(), TimeUnit.MINUTES).build();
Expand All @@ -196,14 +199,23 @@ public void init(InstanceDataManagerConfig instanceDataManagerConfig, HelixManag
_streamSegmentDownloadUntarRateLimitBytesPerSec);
}
int maxParallelSegmentDownloads = instanceDataManagerConfig.getMaxParallelSegmentDownloads();
if (maxParallelSegmentDownloads > 0) {
LOGGER.info(
"Construct segment download semaphore for Table: {}. Maximum number of parallel segment downloads: {}",
_tableNameWithType, maxParallelSegmentDownloads);
_segmentDownloadSemaphore = new Semaphore(maxParallelSegmentDownloads, true);
} else {
_segmentDownloadSemaphore = null;
// For backward compatibility, check if the value is negative and set it to the default in that case
// Earlier this config could be negative, in which case download throttling was completely disabled
if (maxParallelSegmentDownloads <= 0) {
LOGGER.warn("Segment download config: {} found to be <= 0: value {}, resetting it to default of: {}",
CommonConstants.Server.CONFIG_OF_MAX_PARALLEL_SEGMENT_DOWNLOADS, maxParallelSegmentDownloads,
CommonConstants.Server.DEFAULT_MAX_PARALLEL_SEGMENT_DOWNLOADS);
maxParallelSegmentDownloads = Integer.parseInt(CommonConstants.Server.DEFAULT_MAX_PARALLEL_SEGMENT_DOWNLOADS);
}
LOGGER.info(
"Construct segment download semaphore for Table: {}. Maximum number of parallel segment downloads: {}",
_tableNameWithType, maxParallelSegmentDownloads);
_segmentDownloadSemaphore = new SegmentDownloadThrottler(maxParallelSegmentDownloads, _tableNameWithType);
// If the cluster config provider exists, register the download semaphore to get dynamic config updates
if (_clusterConfigProvider != null) {
_clusterConfigProvider.registerClusterConfigChangeListener(_segmentDownloadSemaphore);
}

_logger = LoggerFactory.getLogger(_tableNameWithType + "-" + getClass().getSimpleName());

doInit();
Expand Down Expand Up @@ -806,14 +818,12 @@ protected File downloadSegmentFromDeepStore(SegmentZKMetadata zkMetadata)
String downloadUrl = zkMetadata.getDownloadUrl();
_logger.info("Downloading segment: {} from: {}", segmentName, downloadUrl);
File tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "-" + UUID.randomUUID());
if (_segmentDownloadSemaphore != null) {
long startTime = System.currentTimeMillis();
_logger.info("Acquiring segment download semaphore for segment: {}, queue-length: {} ", segmentName,
_segmentDownloadSemaphore.getQueueLength());
_segmentDownloadSemaphore.acquire();
_logger.info("Acquired segment download semaphore for segment: {} (lock-time={}ms, queue-length={}).",
segmentName, System.currentTimeMillis() - startTime, _segmentDownloadSemaphore.getQueueLength());
}
long startTime = System.currentTimeMillis();
_logger.info("Acquiring segment download semaphore for segment: {}, queue-length: {} ", segmentName,
_segmentDownloadSemaphore.getQueueLength());
_segmentDownloadSemaphore.acquire();
_logger.info("Acquired segment download semaphore for segment: {} (lock-time={}ms, queue-length={}).",
segmentName, System.currentTimeMillis() - startTime, _segmentDownloadSemaphore.getQueueLength());
try {
File untarredSegmentDir;
if (_isStreamSegmentDownloadUntar && zkMetadata.getCrypterName() == null) {
Expand Down Expand Up @@ -843,9 +853,7 @@ protected File downloadSegmentFromDeepStore(SegmentZKMetadata zkMetadata)
_serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.SEGMENT_DOWNLOAD_FROM_REMOTE_FAILURES, 1);
throw e;
} finally {
if (_segmentDownloadSemaphore != null) {
_segmentDownloadSemaphore.release();
}
_segmentDownloadSemaphore.release();
FileUtils.deleteQuietly(tempRootDir);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.pinot.segment.local.utils.SegmentPreprocessThrottler;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.spi.annotations.InterfaceAudience;
import org.apache.pinot.spi.config.provider.PinotClusterConfigProvider;
import org.apache.pinot.spi.env.PinotConfiguration;


Expand All @@ -51,7 +52,8 @@ public interface InstanceDataManager {
* <p>NOTE: The config is the subset of server config with prefix 'pinot.server.instance'
*/
void init(PinotConfiguration config, HelixManager helixManager, ServerMetrics serverMetrics,
@Nullable SegmentPreprocessThrottler segmentPreprocessThrottler)
@Nullable SegmentPreprocessThrottler segmentPreprocessThrottler,
@Nullable PinotClusterConfigProvider clusterConfigProvider)
throws Exception;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.pinot.segment.local.utils.SegmentLocks;
import org.apache.pinot.segment.local.utils.SegmentPreprocessThrottler;
import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
import org.apache.pinot.spi.config.provider.PinotClusterConfigProvider;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.stream.StreamConfigProperties;
import org.apache.pinot.spi.utils.CommonConstants;
Expand All @@ -50,16 +51,19 @@ public class DefaultTableDataManagerProvider implements TableDataManagerProvider
private SegmentLocks _segmentLocks;
private Semaphore _segmentBuildSemaphore;
private SegmentPreprocessThrottler _segmentPreprocessThrottler;
private PinotClusterConfigProvider _clusterConfigProvider;

@Override
public void init(InstanceDataManagerConfig instanceDataManagerConfig, HelixManager helixManager,
SegmentLocks segmentLocks, @Nullable SegmentPreprocessThrottler segmentPreprocessThrottler) {
SegmentLocks segmentLocks, @Nullable SegmentPreprocessThrottler segmentPreprocessThrottler,
@Nullable PinotClusterConfigProvider clusterConfigProvider) {
_instanceDataManagerConfig = instanceDataManagerConfig;
_helixManager = helixManager;
_segmentLocks = segmentLocks;
int maxParallelSegmentBuilds = instanceDataManagerConfig.getMaxParallelSegmentBuilds();
_segmentBuildSemaphore = maxParallelSegmentBuilds > 0 ? new Semaphore(maxParallelSegmentBuilds, true) : null;
_segmentPreprocessThrottler = segmentPreprocessThrottler;
_clusterConfigProvider = clusterConfigProvider;
}

@Override
Expand Down Expand Up @@ -89,7 +93,7 @@ public TableDataManager getTableDataManager(TableConfig tableConfig, @Nullable E
throw new IllegalStateException();
}
tableDataManager.init(_instanceDataManagerConfig, _helixManager, _segmentLocks, tableConfig, segmentPreloadExecutor,
errorCache, _segmentPreprocessThrottler);
errorCache, _segmentPreprocessThrottler, _clusterConfigProvider);
return tableDataManager;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.pinot.segment.local.utils.SegmentPreprocessThrottler;
import org.apache.pinot.spi.annotations.InterfaceAudience;
import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
import org.apache.pinot.spi.config.provider.PinotClusterConfigProvider;
import org.apache.pinot.spi.config.table.TableConfig;


Expand All @@ -40,7 +41,8 @@
public interface TableDataManagerProvider {

void init(InstanceDataManagerConfig instanceDataManagerConfig, HelixManager helixManager, SegmentLocks segmentLocks,
@Nullable SegmentPreprocessThrottler segmentPreprocessThrottler);
@Nullable SegmentPreprocessThrottler segmentPreprocessThrottler,
@Nullable PinotClusterConfigProvider clusterConfigProvider);

default TableDataManager getTableDataManager(TableConfig tableConfig) {
return getTableDataManager(tableConfig, null, null, () -> true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ private TableDataManager makeTestableManager()
new SegmentAllIndexPreprocessThrottler(8, 10, true), new SegmentStarTreePreprocessThrottler(4, 8, true));
TableDataManager tableDataManager = new OfflineTableDataManager();
tableDataManager.init(instanceDataManagerConfig, mock(HelixManager.class), new SegmentLocks(), tableConfig, null,
null, segmentPreprocessThrottler);
null, segmentPreprocessThrottler, null);
tableDataManager.start();
Field segsMapField = BaseTableDataManager.class.getDeclaredField("_segmentDataManagerMap");
segsMapField.setAccessible(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,7 @@ private static OfflineTableDataManager createTableManager(InstanceDataManagerCon
SegmentLocks segmentLocks = new SegmentLocks();
OfflineTableDataManager tableDataManager = new OfflineTableDataManager();
tableDataManager.init(instanceDataManagerConfig, helixManager, segmentLocks, DEFAULT_TABLE_CONFIG, null, null,
SEGMENT_PREPROCESS_THROTTLER);
SEGMENT_PREPROCESS_THROTTLER, null);
return tableDataManager;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ private DimensionTableDataManager makeTableDataManager(HelixManager helixManager
DimensionTableDataManager tableDataManager =
DimensionTableDataManager.createInstanceByTableName(OFFLINE_TABLE_NAME);
tableDataManager.init(instanceDataManagerConfig, helixManager, new SegmentLocks(), tableConfig, null, null,
SEGMENT_PREPROCESS_THROTTLER);
SEGMENT_PREPROCESS_THROTTLER, null);
tableDataManager.start();
return tableDataManager;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -794,7 +794,7 @@ public void testShutdownTableDataManagerWillNotShutdownLeaseExtenderExecutor()
InstanceDataManagerConfig instanceDataManagerConfig = mock(InstanceDataManagerConfig.class);
when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath());
TableDataManagerProvider tableDataManagerProvider = new DefaultTableDataManagerProvider();
tableDataManagerProvider.init(instanceDataManagerConfig, helixManager, new SegmentLocks(), null);
tableDataManagerProvider.init(instanceDataManagerConfig, helixManager, new SegmentLocks(), null, null);
TableDataManager tableDataManager = tableDataManagerProvider.getTableDataManager(tableConfig);
tableDataManager.start();
tableDataManager.shutDown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public void setUp()
InstanceDataManagerConfig instanceDataManagerConfig = mock(InstanceDataManagerConfig.class);
when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(INDEX_DIR.getAbsolutePath());
TableDataManagerProvider tableDataManagerProvider = new DefaultTableDataManagerProvider();
tableDataManagerProvider.init(instanceDataManagerConfig, mock(HelixManager.class), new SegmentLocks(), null);
tableDataManagerProvider.init(instanceDataManagerConfig, mock(HelixManager.class), new SegmentLocks(), null, null);
TableDataManager tableDataManager = tableDataManagerProvider.getTableDataManager(tableConfig);
tableDataManager.start();
//we don't add index segments to the data manager to simulate numSegmentsAcquired < numSegmentsQueried
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public void setUp()
InstanceDataManagerConfig instanceDataManagerConfig = mock(InstanceDataManagerConfig.class);
when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath());
TableDataManagerProvider tableDataManagerProvider = new DefaultTableDataManagerProvider();
tableDataManagerProvider.init(instanceDataManagerConfig, mock(HelixManager.class), new SegmentLocks(), null);
tableDataManagerProvider.init(instanceDataManagerConfig, mock(HelixManager.class), new SegmentLocks(), null, null);
TableDataManager tableDataManager = tableDataManagerProvider.getTableDataManager(tableConfig);
tableDataManager.start();
for (ImmutableSegment indexSegment : _indexSegments) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ public void setUp()
InstanceDataManagerConfig instanceDataManagerConfig = mock(InstanceDataManagerConfig.class);
when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath());
TableDataManagerProvider tableDataManagerProvider = new DefaultTableDataManagerProvider();
tableDataManagerProvider.init(instanceDataManagerConfig, mock(HelixManager.class), new SegmentLocks(), null);
tableDataManagerProvider.init(instanceDataManagerConfig, mock(HelixManager.class), new SegmentLocks(), null, null);
TableDataManager tableDataManager = tableDataManagerProvider.getTableDataManager(TABLE_CONFIG);
tableDataManager.start();
for (IndexSegment indexSegment : _indexSegments) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public void setup()
InstanceDataManagerConfig instanceDataManagerConfig = mock(InstanceDataManagerConfig.class);
when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath());
TableDataManagerProvider tableDataManagerProvider = new DefaultTableDataManagerProvider();
tableDataManagerProvider.init(instanceDataManagerConfig, mock(HelixManager.class), new SegmentLocks(), null);
tableDataManagerProvider.init(instanceDataManagerConfig, mock(HelixManager.class), new SegmentLocks(), null, null);
TableDataManager tableDataManager = tableDataManagerProvider.getTableDataManager(tableConfig);
tableDataManager.start();
tableDataManager.addSegment(_segment);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.pinot.segment.spi.SegmentContext;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
import org.apache.pinot.spi.config.provider.PinotClusterConfigProvider;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;

Expand All @@ -55,7 +56,8 @@ public interface TableDataManager {
void init(InstanceDataManagerConfig instanceDataManagerConfig, HelixManager helixManager, SegmentLocks segmentLocks,
TableConfig tableConfig, @Nullable ExecutorService segmentPreloadExecutor,
@Nullable Cache<Pair<String, String>, SegmentErrorInfo> errorCache,
@Nullable SegmentPreprocessThrottler segmentPreprocessThrottler);
@Nullable SegmentPreprocessThrottler segmentPreprocessThrottler,
@Nullable PinotClusterConfigProvider clusterConfigProvider);

/**
* Returns the instance id of the server.
Expand Down
Loading

0 comments on commit 5de67e3

Please sign in to comment.