Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Segment-level locks in compaction tasks #17660

Open
nozjkoitop opened this issue Jan 24, 2025 · 0 comments
Open

Segment-level locks in compaction tasks #17660

nozjkoitop opened this issue Jan 24, 2025 · 0 comments

Comments

@nozjkoitop
Copy link
Contributor

Affected Version

Tested with version 30.0.0. Related code fragments look unchanged in the master.

Description

Compaction of partitioned segments seems to cause an issue with segment-level locking. Specifically, partial segment tasks are not updated to properly work with segment-level locking. During initialization, the compaction task creates segment locks, but all partial tasks expect to obtain unique intervals from the LockPosses. This discrepancy leads to several problems during the compaction process.

Current Behavior

The problematic method receives 3 locks for each segment + a compaction-stage related lock and throws Duplicate Key exception:

java.lang.IllegalStateException: Duplicate key 2025-01-13T00:00:00.000Z/2025-01-14T00:00:00.000Z (attempted merging values 2025-01-13T00:00:04.948Z and 2025-01-13T00:00:04.948Z)
	at java.base/java.util.stream.Collectors.duplicateKeyException(Collectors.java:133) ~[?:?]
	at java.base/java.util.stream.Collectors.lambda$uniqKeysMapAccumulator$1(Collectors.java:180) ~[?:?]
	at java.base/java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) ~[?:?]
	at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655) ~[?:?]
	at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) ~[?:?]
	at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) ~[?:?]
	at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913) ~[?:?]
	at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:?]
	at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578) ~[?:?]
	at org.apache.druid.indexing.common.task.CachingLocalSegmentAllocator.createVersionFinder(CachingLocalSegmentAllocator.java:114) ~[druid-indexing-service-30.0.0-patch.jar:30.0.0-patch]
	at org.apache.druid.indexing.common.task.CachingLocalSegmentAllocator.<init>(CachingLocalSegmentAllocator.java:85) ~[druid-indexing-service-30.0.0-patch.jar:30.0.0-patch]
	at org.apache.druid.indexing.common.task.SegmentAllocators.forNonLinearPartitioning(SegmentAllocators.java:95) ~[druid-indexing-service-30.0.0-patch.jar:30.0.0-patch]
	at org.apache.druid.indexing.common.task.batch.parallel.PartialRangeSegmentGenerateTask.createSegmentAllocator(PartialRangeSegmentGenerateTask.java:184) ~[druid-indexing-service-30.0.0-patch.jar:30.0.0-patch]
	at org.apache.druid.indexing.common.task.batch.parallel.PartialSegmentGenerateTask.generateSegments(PartialSegmentGenerateTask.java:193) ~[druid-indexing-service-30.0.0-patch.jar:30.0.0-patch]
	at org.apache.druid.indexing.common.task.batch.parallel.PartialSegmentGenerateTask.runTask(PartialSegmentGenerateTask.java:123) ~[druid-indexing-service-30.0.0-patch.jar:30.0.0-patch]
	at org.apache.druid.indexing.common.task.AbstractTask.run(AbstractTask.java:179) ~[druid-indexing-service-30.0.0-patch.jar:30.0.0-patch]
	at org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:478) [druid-indexing-service-30.0.0-patch.jar:30.0.0-patch]
	at org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:450) [druid-indexing-service-30.0.0-patch.jar:30.0.0-patch]
	at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:131) [guava-32.0.1-jre.jar:?]
	at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:75) [guava-32.0.1-jre.jar:?]
	at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:82) [guava-32.0.1-jre.jar:?]
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
	at java.base/java.lang.Thread.run(Thread.java:829) [?:?]

Performed debugging:

forceTimeChunkLock=false:

Image

First 3 locks are created at the sturtup of the main compaction task and treated as TimeChunk although Segment-level were created at very beginning.

forceTimeChunkLock=true:

Image

After implementing a simple workaround in the problematic method of CachingLocalSegmentAllocator (simple grouping and selecting max version) and successfull partial_range_index_generate, the compaction still fails. The error occurs in the validation stage of partial_index_generic_merge. And the following exception is thrown:

org.apache.druid.java.util.common.ISE: Unexpected state: Two versions([2025-01-22T12:19:31.907Z], [2025-01-22T12:19:31.907Z]) for the same interval[2016-06-27T00:00:00.000Z/2016-06-28T00:00:00.000Z]

at org.apache.druid.indexing.common.task.batch.parallel.PartialSegmentMergeTask.lambda$runTask$2(PartialSegmentMergeTask.java:164) ~[druid-indexing-service-30.0.0-patch.jar:30.0.0-patch]
at java.base/java.util.ArrayList.forEach(ArrayList.java:1541) ~[?:?]
at org.apache.druid.indexing.common.task.batch.parallel.PartialSegmentMergeTask.runTask(PartialSegmentMergeTask.java:154) ~[druid-indexing-service-30.0.0-patch.jar:30.0.0-patch]
at org.apache.druid.indexing.common.task.batch.parallel.PartialGenericSegmentMergeTask.runTask(PartialGenericSegmentMergeTask.java:46) ~[druid-indexing-service-30.0.0-patch.jar:30.0.0-patch]
at org.apache.druid.indexing.common.task.AbstractTask.run(AbstractTask.java:179) ~[druid-indexing-service-30.0.0-patch.jar:30.0.0-patch]
at org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:478) [druid-indexing-service-30.0.0-patch.jar:30.0.0]
at org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:450) [druid-indexing-service-30.0.0-patch.jar:30.0.0]
at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:131) [guava-32.0.1-jre.jar:?]
at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:75) [guava-32.0.1-jre.jar:?]
at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:82) [guava-32.0.1-jre.jar:?]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
at java.base/java.lang.Thread.run(Thread.java:829) [?:?]

Expected Behavior

The partial segment tasks should correctly handle segment-level locks, ensuring proper lock management during compaction, and the compaction task should complete successfully.

Steps to Reproduce

  • Run a compaction task for an interval that includes partitioned segments with forceTimeChunkLock=false.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant