-
Notifications
You must be signed in to change notification settings - Fork 3.4k
HBASE-29433: Resolve exception in AsyncTable for coprocessor services when a region merges/splits #7131
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll be away next week, so I'll take next steps here on the week of July 7th
@@ -944,4 +1001,59 @@ public <S, R> CoprocessorServiceBuilder<S, R> coprocessorService( | |||
PartialResultCoprocessorCallback<S, R> callback) { | |||
return new CoprocessorServiceBuilderImpl<>(stubMaker, callable, callback); | |||
} | |||
|
|||
private static class MultiRegionCoprocessorServiceProgress<R>{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The state needed to manage all the individual RPCs to regions was being passed around as parameters, this moves them into this helper class because I needed some additional state
progress.onResponse(region, null, new DoNotRetryIOException( | ||
"Region " + region.getEncodedName() + " no longer exists, likely due to a split or" | ||
+ " merge, and the coprocessor service was already in progress and can't be recovered" | ||
)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case, the CoprocessorCallback::onRegionComplete
has already been called at least once for this region, and I think it'd be better to throw an error in this edge case instead of trying to make the CoprocessorCallback
handle this scenario. In this example scenario, if we had already sent these callbacks:
callback.onRegionComplete(r1, { sum => 10, next => 0x01 });
callback.onRegionComplete(r1, { sum => 7, next => 0x02 });
[r1 is split into r2 and r3]
How would we signal to the caller that the results for region r1 are actually no longer relevant, and that they should now consider those results replaced by r2 and r3? Again, I think keeping the interface as is and continuing to throw an error in this edge case would be better, but open to other thoughts
LOG.debug("Attempted to send a coprocessor service RPC to region {} which no" | ||
+ " longer exists, will attempt to send RPCs to the region(s) that replaced it", | ||
region.getEncodedName()); | ||
restartCoprocessorServiceForRange(stubMaker, callable, callback, region.getStartKey(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is where this is submitting a new call for the range covered by the region that was split/merged
@@ -78,8 +83,7 @@ private CompletableFuture<Message> rpcCall(MethodDescriptor method, Message requ | |||
final Context context = Context.current(); | |||
CompletableFuture<Message> future = new CompletableFuture<>(); | |||
if (region != null && !Bytes.equals(loc.getRegion().getRegionName(), region.getRegionName())) { | |||
future.completeExceptionally(new DoNotRetryIOException("Region name is changed, expected " | |||
+ region.getRegionNameAsString() + ", actual " + loc.getRegion().getRegionNameAsString())); | |||
future.completeExceptionally(new RegionNameChangedException(loc.getRegion())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This works, but I wonder if there is a better place to detect this scenario from the client
...int/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncTableCoprocessorEndpoint.java
Show resolved
Hide resolved
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
603eda5
to
4f18b31
Compare
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't look like the meta cache finds out that it has an invalid region cached. Could you tell it what it needs to clear from memory?
if (e instanceof RegionNoLongerExistsException) { | ||
RegionInfo newRegion = ((RegionNoLongerExistsException) e).getNewRegionInfo(); | ||
if (progress.markNewRegionAndCheckNeedsToBeHandled(newRegion)) { | ||
if (progress.hasResponseStarted(region)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I imagine that this check in one thread will be racing against responses coming in on other threads. Is this logic safe without synchronization?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, ya there is a bug here; say region A and B merged into C, if B started a response already but A was the first to this check, then B wouldn't report an error because it would never make it past markNewRegionAndCheckNeedsToBeHandled
. Will fix that
I can think of a couple more edge cases here around multiple split/merge operations that haven't been loaded into cache; will come up with some more fixes for those, along with look closer at how the meta cache is being updated during this process
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ready for review; I've run the new unit tests 100s of times and am feeling confident that it's correct
addListener(coprocessorService(stubMaker, callable, region, region.getStartKey()), (r, e) -> { | ||
try (Scope ignored = span.makeCurrent()) { | ||
if (e != null) { | ||
callback.onRegionError(region, e); | ||
if (e instanceof RegionNoLongerExistsException) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously, this error would fail the region. With these changes, we'll retry the range that this error affects instead.
callback.onRegionError(region, e); | ||
if (e instanceof RegionNoLongerExistsException) { | ||
RegionInfo newRegion = ((RegionNoLongerExistsException) e).getNewRegionInfo(); | ||
if (progress.markRegionReplacedExistingAndCheckNeedsToBeRestarted(newRegion)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When a region is merged, this is an optimization to avoid sending multiple requests to the new region
callback.onRegionComplete(region, r); | ||
} | ||
if (!claimedRegionAssignment.get()) { | ||
if (!progress.tryClaimRegionAssignment(region)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are two paths we need to consider now.
- Original call
- Recursive call (i.e. when we restart the request on the failed range)
Both places attempt to claim an "assignment" for the region after they receive a response, before actually propagating the request to the client's CoprocessorCallback
if (claimedRegionAssignment.get() || progress.tryClaimRegionAssignment(region)) { | ||
LOG.error("Error processing coprocessor service response for region {}", | ||
region.getEncodedName(), t); | ||
progress.onResponse(region, null, t); | ||
} | ||
progress.markRegionFinished(region); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I noticed that if the getNextCallable
throws, we never call onComplete
; this catch clause fixes that
List<Pair<RegionInfo, RegionProgress>> overlappingAssignments = | ||
getOverlappingRegionAssignments(region); | ||
if (!overlappingAssignments.isEmpty()) { | ||
callback.onRegionError(region, new DoNotRetryIOException( | ||
("Region %s has replaced a region that the coprocessor service already started on, likely" | ||
+ " due to a region merge. Overlapping regions: %s") | ||
.formatted(region.getEncodedName(), overlappingAssignments.stream().map(Pair::getFirst) | ||
.map(RegionInfo::getEncodedName).collect(Collectors.joining(", "))))); | ||
tryUpdateRegionProgress(region, RegionProgress.ASSIGNED, RegionProgress.CANCELLED); | ||
return false; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overlapping assignments is a tricky one to explain;
Say we have this scenario:
Region A: 0x00 -> 0x88
Region B: 0x88 -> 0xFF
To send coprocessors to all regions, we'd call:
coprocessorService(0x00)
coprocessorService(0x88)
Let's saw we thought A and B were the regions when the request started because that was what was in the meta cache, but actually the regions looked like this:
Region C: 0x00 -> 0x55
Region D: 0x55 -> 0xAA
Region E: 0xAA -> 0xFF
If we go through this process to retry each of the failed regions, the restarted requests would re-run locations like this:
locateRange(0x00 -> 0x88): Region C, Region D
locateRange(0x88 -> 0xFF): Region D, Region E
And the actual RPCs would look like this
coprocessorService(0x00)
coprocessorService(0x55)
coprocessorService(0x55)
coprocessorService(0x88)
Region D would get retried twice; there is no way from the client to atomically locate the region and send the RPC to it, so instead this PR attempts to catch this case when we get the response from region D for the second time.
Furthermore, this same race condition can happen when we are sending a coprocessor service that requires multiple RPCs to be sent per region, which is the case here where we'd see an overlapping assignment.
private final ConcurrentNavigableMap<RegionInfo, RegionProgress> regions = | ||
new ConcurrentSkipListMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The largest potential risk I see in affecting existing functionality is that this assignment mechanism blocks a response that would have been allowed before. It's imposing some strict constraints on how the methods in PartialResultCoprocessorCallback
can be called (that I think are intuitive)
onRegionComplete
can only be called once per regiononRegionError
can be called any number of times per regiononComplete
will be the last call to the callback and will be called once if there isn't an erroronError
can be called any number of times, can happen at any time, andonComplete
will not be called
For callbacks that require multiple RPCs per region (e.g. a paging mechanism), the same is true except that the onRegionComplete
will only return one chain of RPCs (where a chain is make rpc call -> continue until getNextCallable() returns null)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went this direction (enforce a strict contract on the callback and avoid sending duplicate RPCs where possible) because I think it is easiest way to get this right. Another approach I considered was having some mechanism that coordinated the RPCs before we sent them, but found that difficult for a few reasons
- I think reasoning about the ranges that needed to be re-processed would have been less intuitive because it'd need to explicitly handle all the edge cases here
- Wouldn't have been able to recurse in the same way to simplify the problem (because the method signature doesn't expose a way to manage RPCs like that
- Would have required some sort of locking to have all requests wait to send responses until we have confirmation that the regions are where we thought they were (i.e. the meta cache was up to date)
- It wouldn't have been any more correct, and would only slightly reduce the number of actual RPCs that we'd send in this very unlikely situation (we only send duplicate RPCs in the overlapping assignment situation from below
|
||
Pair<CompletableFuture<Map<String, List<Pair<SumResponse, Throwable>>>>, | ||
PartialResultCoprocessorCallback<ColumnAggregationService, SumResponse>> futureAndCallback = | ||
buildCallbackForAllRegions((region, prevRequestNumber) -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This splits the region in between the first and second RPC for that region; in this case, because the callback was already called for the first RPC, we fail the region
Unfortunately this mechanism is intricate, hard to get right and easy to break. I think it might be worth considering a smaller change that would avoid the original problem but potentially return incorrect results (like the |
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
I think there are scenarios where we do not know how to retry? For example, we have finished a request to region, and when sending request to the next region, these two regions are merged. Then what is the correct way to do retrying here? Sending request again to the merged region? Or just jump to the next region? |
Any updates here? |
This updates how the client reacts to region merges/splits while processing
coprocessorService()
. Instead of failing the region, it resubmits the affected range of row keys to try to save the call from needing to be aborted.Added unit tests that test this scenario. I also compared this behavior to the HTable from previous versions; HTable doesn't seem to have this same assertion, nor proper handling for split regions, meaning it will potentially return incorrect results.
The coprocessor service logic in the client goes like this when we need to submit RPCs to multiple regions:
AsyncTableRegionLocator
requests, starting with the start key and continuing to locate the next region until we've located the region containing the end keyPartialResultCoprocessorCallback::getNextCallable
returnsnull
CoprocessorCallback::onComplete
to indicate the call as done.This PR inserts logic at step 3, starting a new call to the affected range covered by the (outdated) region returned by step 2.
Marking this as draft because I'll need some time to fix formatting/linting issues, but I'd like to get tests running.