-
Notifications
You must be signed in to change notification settings - Fork 25.4k
Draft: PIT context relocation on shard relocation #132251
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
894b79f
to
18391a3
Compare
[CI] Auto commit changes from spotless
4ac74d5
to
fccadb1
Compare
if (node != null) { | ||
targetNodes = Collections.singleton(node); | ||
} else { | ||
staticLogger.info("---> missing node when closing context: " + contextId.getNode()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note: We need to close the contexts after moving them when the "old" PIT is used, so if the originally encoded node it gone we try all remaining ones that currently hold that shard here (regardless of whether that node also hold a pit context.
import java.util.Collections; | ||
import java.util.Map; | ||
|
||
public class PITHelper { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just for debugging atm
} | ||
|
||
@Override | ||
protected void doExecute(Task task, ClosePointInTimeRequest request, ActionListener<ClosePointInTimeResponse> listener) { | ||
final SearchContextId searchContextId = SearchContextId.decode(namedWriteableRegistry, request.getId()); | ||
final Collection<SearchContextIdForNode> contextIds = searchContextId.shards().values(); | ||
Map<ShardId, SearchContextIdForNode> shards = searchContextId.shards(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See above changes in ClearScrollController. We also need to pass in the shardIds now so we can retry if the original node is gone.
targetNodes.add(perNode.getNode()); | ||
} | ||
if (perNode.getSearchContextId().getSearcherId() != null) { | ||
if (perNode.getSearchContextId().getSearcherId() != null || nodeExists == false) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is where we re-try other shards when the original PIT node is gone now. Trying every node with a shard copy might be too much on the long run but without a cluster-wide service that keeps track of where the PIT contexts are this might be unavoidable. Needs follow up in terms of altering the PIT id once we found the new node where the PIT context lives now.
@@ -360,7 +372,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv | |||
|
|||
private final AtomicLong idGenerator = new AtomicLong(); | |||
|
|||
private final Map<Long, ReaderContext> activeReaders = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(); | |||
private final Map<ReaderContextId, ReaderContext> activeReaders = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently we keep track of the readers in this map only by an auto-increment long counter, which is also encoded in the PIT id to lookup the right ReaderContext. We cannot simply move the ReaderContext to another node using only that id now because of conflicts with existing contexts on that node. But we also cannot change it because its encoded in the PIT Id (via the ShardSearchContextIds). I changed the key to a new joint key that also uses the original sessionId encoded in the PIT for retrieval on the new node and to avoid id collisions.
return this.activeReaders.values() | ||
.stream() | ||
.filter(c -> c.singleSession() == false) | ||
.filter(c -> c.scrollContext() == null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exclude single-session ReaderContexts because they get cleared after each search, also don't move anything that is a scroll context.
.collect(Collectors.toList()); | ||
} | ||
|
||
public void reopenPitContexts(ShardId shardId, String segmentsFileName, long keepAlive, String sessionId, long contextId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This whole part is pretty rough still, I mostly wanted something working in the receiving side of the PIT relocation that makes the integration test work. There are probably many open questions here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we could move part of this logic into the Engine
? this would allow us to track the necessary metadata to keep blobs around (and it would throw if we ever try to call this in an Engine
that doesn't support to transfer PIT contexts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks in the right direction, I left a suggestion
.collect(Collectors.toList()); | ||
} | ||
|
||
public void reopenPitContexts(ShardId shardId, String segmentsFileName, long keepAlive, String sessionId, long contextId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we could move part of this logic into the Engine
? this would allow us to track the necessary metadata to keep blobs around (and it would throw if we ever try to call this in an Engine
that doesn't support to transfer PIT contexts?
WIP work to support PIT context relocation when nodes gracefully shut down and shards are relocated.