Skip to content

Commit 65f4bb1

Browse files
committed
Add test cases involving potentially blocked operations
1 parent bc78281 commit 65f4bb1

File tree

2 files changed

+144
-0
lines changed

2 files changed

+144
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
package net.jodah.failsafe.functional;
2+
3+
import net.jodah.failsafe.*;
4+
import org.testng.annotations.Test;
5+
6+
import java.time.Duration;
7+
import java.util.concurrent.*;
8+
import java.util.concurrent.atomic.AtomicBoolean;
9+
10+
import static org.testng.Assert.assertFalse;
11+
12+
/**
13+
* Tests scenarios against a small threadpool where executions could be temporarily blocked.
14+
*/
15+
@Test
16+
public class BlockedExecutionTest {
17+
/**
18+
* Asserts that a scheduled execution that is blocked on a threadpool is properly cancelled when a timeout occurs.
19+
*/
20+
@Test
21+
public void shouldCancelScheduledExecutionOnTimeout() throws Throwable {
22+
ExecutorService executor = Executors.newSingleThreadExecutor();
23+
Timeout<Boolean> timeout = Timeout.of(Duration.ofMillis(100));
24+
AtomicBoolean supplierCalled = new AtomicBoolean();
25+
executor.submit(Testing.uncheck(() -> Thread.sleep(300)));
26+
27+
Future<Boolean> future = Failsafe.with(timeout).with(executor).getAsync(() -> {
28+
supplierCalled.set(true);
29+
return false;
30+
});
31+
32+
Asserts.assertThrows(() -> future.get(1000, TimeUnit.MILLISECONDS), ExecutionException.class,
33+
TimeoutExceededException.class);
34+
Thread.sleep(300);
35+
assertFalse(supplierCalled.get());
36+
}
37+
38+
/**
39+
* Asserts that a scheduled retry that is blocked on a threadpool is properly cancelled when a timeout occurs.
40+
*/
41+
public void shouldCancelScheduledRetryOnTimeout() {
42+
ExecutorService executor = Executors.newSingleThreadExecutor();
43+
Timeout<Boolean> timeout = Timeout.of(Duration.ofMillis(100));
44+
RetryPolicy<Boolean> rp = new RetryPolicy<Boolean>().withDelay(Duration.ofMillis(1000)).handleResult(false);
45+
46+
Future<Boolean> future = Failsafe.with(timeout, rp).with(executor).getAsync(() -> {
47+
// Tie up single thread immediately after execution, before the retry is scheduled
48+
executor.submit(Testing.uncheck(() -> Thread.sleep(1000)));
49+
return false;
50+
});
51+
52+
Asserts.assertThrows(() -> future.get(500, TimeUnit.MILLISECONDS), ExecutionException.class,
53+
TimeoutExceededException.class);
54+
}
55+
56+
/**
57+
* Asserts that a scheduled fallback that is blocked on a threadpool is properly cancelled when a timeout occurs.
58+
*/
59+
public void shouldCancelScheduledFallbackOnTimeout() {
60+
ExecutorService executor = Executors.newSingleThreadExecutor();
61+
Timeout<Boolean> timeout = Timeout.of(Duration.ofMillis(100));
62+
AtomicBoolean fallbackCalled = new AtomicBoolean();
63+
Fallback<Boolean> fallback = Fallback.ofAsync(() -> {
64+
fallbackCalled.set(true);
65+
return true;
66+
}).handleResult(false);
67+
68+
Future<Boolean> future = Failsafe.with(timeout, fallback).with(executor).getAsync(() -> {
69+
// Tie up single thread immediately after execution, before the fallback is scheduled
70+
executor.submit(Testing.uncheck(() -> Thread.sleep(1000)));
71+
return false;
72+
});
73+
74+
Asserts.assertThrows(() -> future.get(500, TimeUnit.MILLISECONDS), ExecutionException.class,
75+
TimeoutExceededException.class);
76+
assertFalse(fallbackCalled.get());
77+
}
78+
79+
/**
80+
* Asserts that a scheduled fallback that is blocked on a threadpool is properly cancelled when the outer future is
81+
* cancelled.
82+
*/
83+
public void shouldCancelScheduledFallbackOnCancel() throws Throwable {
84+
AtomicBoolean fallbackCalled = new AtomicBoolean();
85+
ExecutorService executor = Executors.newSingleThreadExecutor();
86+
Fallback<Boolean> fallback = Fallback.ofAsync(() -> {
87+
fallbackCalled.set(true);
88+
return true;
89+
}).handleResult(false);
90+
91+
Future<Boolean> future = Failsafe.with(fallback).with(executor).getAsync(() -> {
92+
executor.submit(Testing.uncheck(() -> Thread.sleep(300)));
93+
return false;
94+
});
95+
96+
Thread.sleep(100);
97+
future.cancel(false);
98+
Asserts.assertThrows(future::get, CancellationException.class);
99+
Thread.sleep(300);
100+
assertFalse(fallbackCalled.get());
101+
}
102+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package net.jodah.failsafe.issues;
2+
3+
import net.jodah.failsafe.Failsafe;
4+
import net.jodah.failsafe.RetryPolicy;
5+
import net.jodah.failsafe.Timeout;
6+
import net.jodah.failsafe.function.ContextualRunnable;
7+
import org.testng.annotations.Test;
8+
9+
import java.time.Duration;
10+
import java.util.concurrent.ExecutorService;
11+
import java.util.concurrent.Executors;
12+
import java.util.concurrent.Future;
13+
import java.util.function.Function;
14+
15+
@Test
16+
public class Issue260Test {
17+
public void test() throws Throwable {
18+
ExecutorService executor = Executors.newSingleThreadExecutor();
19+
Timeout<Object> timeout = Timeout.of(Duration.ofMillis(300))
20+
.onFailure(e -> System.out.println("Interrupted"))
21+
.withInterrupt(true);
22+
RetryPolicy<Object> rp = new RetryPolicy<>().onRetry(e -> System.out.println("Retrying"))
23+
.onSuccess(e -> System.out.println("Success"));
24+
25+
Function<Integer, ContextualRunnable> task = (taskId) -> ctx -> {
26+
System.out.println("Starting execution of task " + taskId);
27+
try {
28+
Thread.sleep(200);
29+
} catch (InterruptedException e) {
30+
System.out.println("Interrupted task " + taskId);
31+
throw e;
32+
}
33+
};
34+
35+
Future<?> f1 = Failsafe.with(rp, timeout).with(executor).runAsync(task.apply(1));
36+
Future<?> f2 = Failsafe.with(rp, timeout).with(executor).runAsync(task.apply(2));
37+
Future<?> f3 = Failsafe.with(rp, timeout).with(executor).runAsync(task.apply(3));
38+
f1.get();
39+
f2.get();
40+
f3.get();
41+
}
42+
}

0 commit comments

Comments
 (0)