-
Notifications
You must be signed in to change notification settings - Fork 1.2k
SharedMergeScheduler using shared thread pool for multi-tenant merge scheduling #14900
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
This PR does not have an entry in lucene/CHANGES.txt. Consider adding one. If the PR doesn't need a changelog entry, then add the skip-changelog label to it and you will stop receiving this reminder on future updates to the PR. |
This PR does not have an entry in lucene/CHANGES.txt. Consider adding one. If the PR doesn't need a changelog entry, then add the skip-changelog label to it and you will stop receiving this reminder on future updates to the PR. |
This seems to be in the right direction. Let's make it a singleton? As a next step, should we add support for prioritising small merges over larger ones (by backing the thread pool with a priority blocking queue)? Also, since this is a draft PR, let's set it as "draft" on GitHub. You can also add a comment anywhere in your PR that says |
@vigyasharma what would you think of the following:
|
@jpountz I feel the scheduler should have some control on how the executor is created, its backing queue etc, so that it can prioritize how merges get scheduled, e.g. pick smaller merges before large ones, or have merges smaller than We could have the multi-tenant CMS accept a constructor argument for a custom |
Based on all this, I think we get the following structure:
FWIW, I'm not really sure if sharing a thread pool b/w indexing and merging would be simpler than having separate thread pools and applying backpressure on indexing. I still need to grok all the details. But if we think backpressure is the way to go, and we'll always only use the OTOH, if we want a shared indexing/merging thread pool, or foresee a need to have different thread pools for sets of writers (shards in Elasticsearch / OpenSearch), then the ctor arg for executor service makes sense. |
This PR does not have an entry in lucene/CHANGES.txt. Consider adding one. If the PR doesn't need a changelog entry, then add the skip-changelog label to it and you will stop receiving this reminder on future updates to the PR. |
…nstead of static field
This PR does not have an entry in lucene/CHANGES.txt. Consider adding one. If the PR doesn't need a changelog entry, then add the skip-changelog label to it and you will stop receiving this reminder on future updates to the PR. |
This PR does not have an entry in lucene/CHANGES.txt. Consider adding one. If the PR doesn't need a changelog entry, then add the skip-changelog label to it and you will stop receiving this reminder on future updates to the PR. |
… and size tracking
This PR does not have an entry in lucene/CHANGES.txt. Consider adding one. If the PR doesn't need a changelog entry, then add the skip-changelog label to it and you will stop receiving this reminder on future updates to the PR. |
public long getMergeSize() { | ||
return mergeSize; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we keep an empty line at the end of file. There might be an editor/formatter setting you want to add that brings it in automatically.
|
||
public class MergeTaskWrapper { | ||
private final Runnable mergeTask; | ||
private final Object sourceWriter; // Can be IndexWriter or its ID |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can directly use IndexWriter
or String
for ID instead of using Object. Keeping the type as specific as possible helps catch a lot of bugs before runtime.
* version, the shared executor should be properly shut down. | ||
*/ | ||
@Override | ||
public void close() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This impl. for close()
will shut down the entire executor. Since this is a shared merge scheduler, you only want to address merges related to the closing IndexWriter, like cancelling all pending merges and waiting for running merges to complete. (See close() in CMS as well).
@@ -0,0 +1,25 @@ | |||
package org.apache.lucene.index; | |||
|
|||
public class MergeTaskWrapper { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm curious how we intend to use this wrapper, looking forward to the next iteration of this PR.
To me the reason for sharing the thread pools is not to make things simpler but rather to make it easier to control the overall number of active indexing+merging threads. For CPU-bound workloads (which indexing tends to be), the best approach is to create a thread pool sized based on the number of cores of the machine. You can't do this if indexing and merging don't use the same thread pool. |
This PR does not have an entry in lucene/CHANGES.txt. Consider adding one. If the PR doesn't need a changelog entry, then add the skip-changelog label to it and you will stop receiving this reminder on future updates to the PR. |
This PR does not have an entry in lucene/CHANGES.txt. Consider adding one. If the PR doesn't need a changelog entry, then add the skip-changelog label to it and you will stop receiving this reminder on future updates to the PR. |
*/ | ||
@Override | ||
public void merge(MergeSource mergeSource, MergeTrigger trigger) throws IOException { | ||
while (true) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we use hasPendingMerges()
instead?
MergeTaskWrapper wrappedTask = new MergeTaskWrapper(mergeRunnable, (IndexWriter) mergeSource, merge.totalBytesSize()); | ||
|
||
// Registering this task under the writer | ||
writerToMerges.computeIfAbsent((IndexWriter) mergeSource, k -> new CopyOnWriteArraySet<>()).add(wrappedTask); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think you can cast mergeSource
as an IndexWriter
. However, you might not need to. This CMS is no longer a singleton, only the executor is shared. The CMS is actually owned by an indexWriter, which means all calls to merge()
come from the same IW.
So instead of a writer -> merges
mapping that you'd need in a singleton, all you need here is the set of pending and running merges submitted to this CMS. They are all from the same writer! When writer closes, instead of shutting down the executor, you could update the merge objects in the set (MergeTaskWrappers), and set an aborted flag. And update your runnable to skip the merge if "aborted" flag has been set.
[Draft] SharedMergeScheduler using shared thread pool for multi-tenant merge scheduling
This draft PR introduces a prototype
SharedMergeScheduler
, which extendsMergeScheduler
and routes all merge tasks through a shared thread pool acrossIndexWriter
instances.Motivation
In multi-tenant environments (e.g., Solr, Elasticsearch), many
IndexWriter
s may coexist in the same JVM. The default Lucene behavior assigns each writer its ownConcurrentMergeScheduler
and thread pool, which can lead to resource oversubscription and inefficient coordination.This implementation introduces a new merge scheduler that centralizes merge execution via a shared thread pool. This idea builds on feedback from GitHub issue #13883, where contributors suggested exploring a dedicated scheduler based on Java's executor framework, rather than modifying the existing
ConcurrentMergeScheduler
.Implementation Highlights
SharedMergeScheduler
inlucene.index
merge(MergeSource, MergeTrigger)
method using the publicMergeSource
APIExecutors.newFixedThreadPool(4)
as a prototype shared executorConcurrentMergeScheduler
unchanged for easier evaluation and iterationNext Steps
This PR is opened to propose and evaluate a shared-thread-pool merge scheduler design, and to gather feedback for further development.