diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java index 380b358a84ed..6fc5d4636d3c 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java @@ -102,6 +102,7 @@ import org.apache.pinot.spi.stream.StreamMetadataProvider; import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory; +import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.CommonConstants.ConsumerState; import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.CompletionMode; import org.apache.pinot.spi.utils.IngestionConfigUtils; @@ -1619,7 +1620,17 @@ public RealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConf // Acquire semaphore to create stream consumers try { - _partitionGroupConsumerSemaphore.acquire(); + while (!_partitionGroupConsumerSemaphore.tryAcquire(5, TimeUnit.MINUTES)) { + // reload segment metadata to get latest status + segmentZKMetadata = _realtimeTableDataManager.fetchZKMetadata(_segmentNameStr); + + if (segmentZKMetadata.getStatus() == CommonConstants.Segment.Realtime.Status.DONE) { + // segment has already been uploaded by another server. + _segmentLogger.warn("segment: {} already exists. Skipping creation of RealtimeSegmentDataManager", + _segmentNameStr); + throw new SegmentAlreadyExistsException("segment: " + _segmentNameStr + " status must be in progress"); + } + } _acquiredConsumerSemaphore.set(true); } catch (InterruptedException e) { String errorMsg = "InterruptedException when acquiring the partitionConsumerSemaphore"; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java index 2b4778d3904f..e11c91562092 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java @@ -529,10 +529,18 @@ private void doAddConsumingSegment(String segmentName) PartitionDedupMetadataManager partitionDedupMetadataManager = _tableDedupMetadataManager != null ? _tableDedupMetadataManager.getOrCreatePartitionManager(partitionGroupId) : null; - RealtimeSegmentDataManager realtimeSegmentDataManager = - new RealtimeSegmentDataManager(zkMetadata, tableConfig, this, _indexDir.getAbsolutePath(), indexLoadingConfig, - schema, llcSegmentName, semaphore, _serverMetrics, partitionUpsertMetadataManager, - partitionDedupMetadataManager, _isTableReadyToConsumeData); + RealtimeSegmentDataManager realtimeSegmentDataManager; + try { + realtimeSegmentDataManager = + new RealtimeSegmentDataManager(zkMetadata, tableConfig, this, _indexDir.getAbsolutePath(), indexLoadingConfig, + schema, llcSegmentName, semaphore, _serverMetrics, partitionUpsertMetadataManager, + partitionDedupMetadataManager, _isTableReadyToConsumeData); + } catch (SegmentAlreadyExistsException e) { + // Don't do anything since segment was already committed by another instance. + // Eventually this server should receive a CONSUMING -> ONLINE helix state transition + // to add it. + return; + } registerSegment(segmentName, realtimeSegmentDataManager, partitionUpsertMetadataManager); if (partitionUpsertMetadataManager != null) { partitionUpsertMetadataManager.trackNewlyAddedSegment(segmentName); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentAlreadyExistsException.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentAlreadyExistsException.java new file mode 100644 index 000000000000..9af8c19d52d9 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentAlreadyExistsException.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.data.manager.realtime; + +public class SegmentAlreadyExistsException extends RuntimeException { + + public SegmentAlreadyExistsException(String msg) { + super(msg); + } +}