Skip to content

Commit 1586737

Browse files
committed
Update FallbackExecutor to apply fallback prior to postExecute
Updates FallbackExecutor to apply fallback prior to postExecute. This ensures that any event handling done as part of postExecute takes the final fallback result into account. Partially addresses #248.
1 parent b7bc301 commit 1586737

File tree

3 files changed

+139
-54
lines changed

3 files changed

+139
-54
lines changed

src/main/java/net/jodah/failsafe/FallbackExecutor.java

+57-35
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import net.jodah.failsafe.util.concurrent.Scheduler;
1919

2020
import java.util.concurrent.*;
21+
import java.util.function.Supplier;
2122

2223
/**
2324
* A PolicyExecutor that handles failures according to a {@link Fallback}.
@@ -27,47 +28,68 @@ class FallbackExecutor extends PolicyExecutor<Fallback> {
2728
super(fallback, execution);
2829
}
2930

31+
/**
32+
* Performs an execution by calling pre-execute else calling the supplier, applying a fallback if it fails, and
33+
* calling post-execute.
34+
*/
3035
@Override
3136
@SuppressWarnings("unchecked")
32-
protected ExecutionResult onFailure(ExecutionResult result) {
33-
try {
34-
return policy == Fallback.VOID ?
35-
result.withNonResult() :
36-
result.withResult(policy.apply(result.getResult(), result.getFailure(), execution.copy()));
37-
} catch (Throwable t) {
38-
return ExecutionResult.failure(t);
39-
}
37+
protected Supplier<ExecutionResult> supply(Supplier<ExecutionResult> supplier, Scheduler scheduler) {
38+
return () -> {
39+
ExecutionResult result = supplier.get();
40+
if (isFailure(result)) {
41+
try {
42+
result = policy == Fallback.VOID ?
43+
result.withNonResult() :
44+
result.withResult(policy.apply(result.getResult(), result.getFailure(), execution.copy()));
45+
} catch (Throwable t) {
46+
result = ExecutionResult.failure(t);
47+
}
48+
}
49+
50+
return postExecute(result);
51+
};
4052
}
4153

54+
/**
55+
* Performs an async execution by calling pre-execute else calling the supplier and doing a post-execute.
56+
*/
57+
@Override
4258
@SuppressWarnings("unchecked")
43-
protected CompletableFuture<ExecutionResult> onFailureAsync(ExecutionResult result, Scheduler scheduler,
44-
FailsafeFuture<Object> future) {
45-
CompletableFuture<ExecutionResult> promise = new CompletableFuture<>();
46-
Callable<Object> callable = () -> {
47-
try {
48-
CompletableFuture<Object> fallback = policy.applyStage(result.getResult(), result.getFailure(),
49-
execution.copy());
50-
fallback.whenComplete((innerResult, failure) -> {
51-
if (failure instanceof CompletionException)
52-
failure = failure.getCause();
53-
ExecutionResult r = failure == null ? result.withResult(innerResult) : ExecutionResult.failure(failure);
54-
promise.complete(r);
55-
});
56-
} catch (Throwable t) {
57-
promise.complete(ExecutionResult.failure(t));
58-
}
59-
return null;
60-
};
59+
protected Supplier<CompletableFuture<ExecutionResult>> supplyAsync(
60+
Supplier<CompletableFuture<ExecutionResult>> supplier, Scheduler scheduler, FailsafeFuture<Object> future) {
61+
return () -> supplier.get().thenCompose(result -> {
62+
if (isFailure(result)) {
63+
CompletableFuture<ExecutionResult> promise = new CompletableFuture<>();
64+
Callable<Object> callable = () -> {
65+
try {
66+
CompletableFuture<Object> fallback = policy.applyStage(result.getResult(), result.getFailure(),
67+
execution.copy());
68+
fallback.whenComplete((innerResult, failure) -> {
69+
if (failure instanceof CompletionException)
70+
failure = failure.getCause();
71+
ExecutionResult r = failure == null ? result.withResult(innerResult) : ExecutionResult.failure(failure);
72+
promise.complete(r);
73+
});
74+
} catch (Throwable t) {
75+
promise.complete(ExecutionResult.failure(t));
76+
}
77+
return null;
78+
};
6179

62-
try {
63-
if (!policy.isAsync())
64-
callable.call();
65-
else
66-
future.inject((Future) scheduler.schedule(callable, result.getWaitNanos(), TimeUnit.NANOSECONDS));
67-
} catch (Throwable t) {
68-
promise.completeExceptionally(t);
69-
}
80+
try {
81+
if (!policy.isAsync())
82+
callable.call();
83+
else
84+
future.inject((Future) scheduler.schedule(callable, result.getWaitNanos(), TimeUnit.NANOSECONDS));
85+
} catch (Throwable t) {
86+
promise.completeExceptionally(t);
87+
}
88+
89+
return promise.thenCompose(ss -> postExecuteAsync(ss, scheduler, future));
90+
}
7091

71-
return promise;
92+
return postExecuteAsync(result, scheduler, future);
93+
});
7294
}
7395
}

src/test/java/net/jodah/failsafe/ListenersTest.java

+19-19
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,8 @@ static class ListenerCounter {
6666
/** Per listener invocations */
6767
AtomicInteger invocations = new AtomicInteger();
6868

69-
/** Records a sync invocation of the {@code listener}. */
70-
void sync() {
69+
/** Records an invocation of the {@code listener}. */
70+
void record() {
7171
invocations.incrementAndGet();
7272
}
7373

@@ -113,30 +113,30 @@ private <T> FailsafeExecutor<T> registerListeners(RetryPolicy<T> retryPolicy, Ci
113113
Failsafe.with(retryPolicy, circuitBreaker) :
114114
Failsafe.with(fallback, retryPolicy, circuitBreaker);
115115

116-
retryPolicy.onAbort(e -> rpAbort.sync());
117-
retryPolicy.onFailedAttempt(e -> rpFailedAttempt.sync());
118-
retryPolicy.onRetriesExceeded(e -> rpRetriesExceeded.sync());
119-
retryPolicy.onRetry(e -> rpRetry.sync());
120-
retryPolicy.onSuccess(e -> rpSuccess.sync());
121-
retryPolicy.onFailure(e -> rpFailure.sync());
116+
retryPolicy.onAbort(e -> rpAbort.record());
117+
retryPolicy.onFailedAttempt(e -> rpFailedAttempt.record());
118+
retryPolicy.onRetriesExceeded(e -> rpRetriesExceeded.record());
119+
retryPolicy.onRetry(e -> rpRetry.record());
120+
retryPolicy.onSuccess(e -> rpSuccess.record());
121+
retryPolicy.onFailure(e -> rpFailure.record());
122122

123-
circuitBreaker.onOpen(() -> cbOpen.sync());
124-
circuitBreaker.onHalfOpen(() -> cbHalfOpen.sync());
125-
circuitBreaker.onClose(() -> cbClose.sync());
126-
circuitBreaker.onSuccess(e -> cbSuccess.sync());
127-
circuitBreaker.onFailure(e -> cbFailure.sync());
123+
circuitBreaker.onOpen(() -> cbOpen.record());
124+
circuitBreaker.onHalfOpen(() -> cbHalfOpen.record());
125+
circuitBreaker.onClose(() -> cbClose.record());
126+
circuitBreaker.onSuccess(e -> cbSuccess.record());
127+
circuitBreaker.onFailure(e -> cbFailure.record());
128128

129129
if (fallback != null) {
130-
fallback.onSuccess(e -> fbSuccess.sync());
131-
fallback.onFailure(e -> fbFailure.sync());
130+
fallback.onSuccess(e -> fbSuccess.record());
131+
fallback.onFailure(e -> fbFailure.record());
132132
}
133133

134134
failsafe.onComplete(e -> {
135-
complete.sync();
135+
complete.record();
136136
waiter.resume();
137137
});
138-
failsafe.onSuccess(e -> success.sync());
139-
failsafe.onFailure(e -> failure.sync());
138+
failsafe.onSuccess(e -> success.record());
139+
failsafe.onFailure(e -> failure.record());
140140

141141
return failsafe;
142142
}
@@ -416,7 +416,7 @@ private void assertForFailingFallback(boolean sync) throws Throwable {
416416
CircuitBreaker<Object> circuitBreaker = new CircuitBreaker<>().withDelay(Duration.ZERO)
417417
.handle(NullPointerException.class);
418418
// And failing Fallback
419-
Fallback<Object> fallback = Fallback.ofAsync(() -> true);
419+
Fallback<Object> fallback = Fallback.ofAsync(() -> { throw new Exception(); });
420420
FailsafeExecutor<Object> failsafe = registerListeners(retryPolicy, circuitBreaker, fallback);
421421

422422
// When
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package net.jodah.failsafe.issues;
2+
3+
import net.jodah.failsafe.Failsafe;
4+
import net.jodah.failsafe.Fallback;
5+
import net.jodah.failsafe.RetryPolicy;
6+
import org.testng.annotations.BeforeMethod;
7+
import org.testng.annotations.Test;
8+
9+
import java.util.concurrent.atomic.AtomicBoolean;
10+
11+
import static org.testng.Assert.*;
12+
13+
@Test
14+
public class Issue284Test {
15+
AtomicBoolean success;
16+
AtomicBoolean failure;
17+
AtomicBoolean executed;
18+
Fallback<String> fallback;
19+
RetryPolicy<String> retryPolicy = new RetryPolicy<String>().handleResult(null)
20+
.onSuccess(e -> success.set(true))
21+
.onFailure(e -> failure.set(true));
22+
23+
@BeforeMethod
24+
protected void beforeMethod() {
25+
success = new AtomicBoolean();
26+
failure = new AtomicBoolean();
27+
executed = new AtomicBoolean();
28+
}
29+
30+
private Fallback<String> fallbackFor(String result) {
31+
return Fallback.of(result).handleResult(null).onSuccess(e -> success.set(true)).onFailure(e -> failure.set(true));
32+
}
33+
34+
public void testFallbackSuccess() {
35+
fallback = fallbackFor("hello");
36+
String result = Failsafe.with(fallback).get(() -> null);
37+
38+
assertEquals(result, "hello");
39+
assertTrue(success.get(), "Fallback should have been successful");
40+
}
41+
42+
public void testFallbackFailure() {
43+
fallback = fallbackFor(null);
44+
String result = Failsafe.with(fallback).get(() -> null);
45+
46+
assertNull(result);
47+
assertTrue(failure.get(), "Fallback should have failed");
48+
}
49+
50+
public void testRetryPolicySuccess() {
51+
String result = Failsafe.with(retryPolicy).get(() -> !executed.getAndSet(true) ? null : "hello");
52+
53+
assertEquals(result, "hello");
54+
assertTrue(success.get(), "RetryPolicy should have been successful");
55+
}
56+
57+
public void testRetryPolicyFailure() {
58+
String result = Failsafe.with(retryPolicy).get(() -> null);
59+
60+
assertNull(result);
61+
assertTrue(failure.get(), "RetryPolicy should have failed");
62+
}
63+
}

0 commit comments

Comments
 (0)