Skip to content

Commit 9a5eb45

Browse files
committed
Add a Bulkhead policy
1 parent 0b5d1b4 commit 9a5eb45

12 files changed

+618
-3
lines changed

CHANGELOG.md

+8-2
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,14 @@
1-
# 3.1
1+
# 3.2.0
22

33
### Improvements
44

5-
- Issue #308 - Introduce a `RateLimiter` policy.
5+
- Issue #309 - Introduced a `Bulkhead` policy.
6+
7+
# 3.1.0
8+
9+
### Improvements
10+
11+
- Issue #308 - Introduced a `RateLimiter` policy.
612

713
# 3.0.2
814

README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
[![JavaDoc](https://img.shields.io/maven-central/v/dev.failsafe/failsafe.svg?maxAge=60&label=javadoc)](https://failsafe.dev/javadoc/)
77
[![Join the chat at https://gitter.im/jhalterman/failsafe](https://badges.gitter.im/jhalterman/failsafe.svg)](https://gitter.im/jhalterman/failsafe)
88

9-
Failsafe is a lightweight, zero-dependency library for handling failures in Java 8+, with a concise API for handling everyday use cases and the flexibility to handle everything else. It works by wrapping executable logic with one or more resilience policies, which can be combined and composed as needed. Current policies include [Retry](https://failsafe.dev/retry/), [CircuitBreaker](https://failsafe.dev/circuit-breaker/), [RateLimiter](https://failsafe.dev/rate-limiter/), [Timeout](https://failsafe.dev/timeout/), and [Fallback](https://failsafe.dev/fallback/).
9+
Failsafe is a lightweight, zero-dependency library for handling failures in Java 8+, with a concise API for handling everyday use cases and the flexibility to handle everything else. It works by wrapping executable logic with one or more resilience policies, which can be combined and composed as needed. Current policies include [Retry](https://failsafe.dev/retry/), [CircuitBreaker](https://failsafe.dev/circuit-breaker/), [RateLimiter](https://failsafe.dev/rate-limiter/), [Timeout](https://failsafe.dev/timeout/), [Bulkhead](https://failsafe.dev/bulkhead/), and [Fallback](https://failsafe.dev/fallback/).
1010

1111
## Usage
1212

+120
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
/*
2+
* Copyright 2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License
15+
*/
16+
package dev.failsafe;
17+
18+
import dev.failsafe.internal.BulkheadImpl;
19+
20+
import java.time.Duration;
21+
22+
/**
23+
* A bulkhead allows you to restrict concurrent executions as a way of preventing system overload.
24+
* <p>
25+
* This class is threadsafe.
26+
* </p>
27+
*
28+
* @param <R> result type
29+
* @author Jonathan Halterman
30+
* @see BulkheadConfig
31+
* @see BulkheadBuilder
32+
* @see BulkheadFullException
33+
*/
34+
public interface Bulkhead<R> extends Policy<R> {
35+
/**
36+
* Returns a Bulkhead for the {@code maxConcurrency} that has {@link BulkheadBuilder#withMaxWaitTime(Duration) zero
37+
* wait} and is {@link BulkheadBuilder#withFairness() not fair} by default.
38+
*
39+
* @param maxConcurrency controls the max concurrent executions that are permitted within the bulkhead
40+
*/
41+
static <R> BulkheadBuilder<R> builder(int maxConcurrency) {
42+
return new BulkheadBuilder<>(maxConcurrency);
43+
}
44+
45+
/**
46+
* Creates a new BulkheadBuilder that will be based on the {@code config}. The built bulkhead is {@link
47+
* BulkheadBuilder#withFairness() not fair} by default.
48+
*/
49+
static <R> BulkheadBuilder<R> builder(BulkheadConfig<R> config) {
50+
return new BulkheadBuilder<>(config);
51+
}
52+
53+
/**
54+
* Returns a Bulkhead for the {@code maxConcurrency} that has {@link BulkheadBuilder#withMaxWaitTime(Duration) zero
55+
* wait} and is {@link BulkheadBuilder#withFairness() not fair} by default. Alias for {@code
56+
* Bulkhead.builder(maxConcurrency).build()}. To configure additional options on a Bulkhead, use {@link #builder(int)}
57+
* instead.
58+
*
59+
* @param maxConcurrency controls the max concurrent executions that are permitted within the bulkhead
60+
* @see #builder(int)
61+
*/
62+
static <R> Bulkhead<R> of(int maxConcurrency) {
63+
return new BulkheadImpl<>(new BulkheadConfig<>(maxConcurrency));
64+
}
65+
66+
/**
67+
* Returns the {@link BulkheadConfig} that the Bulkhead was built with.
68+
*/
69+
@Override
70+
BulkheadConfig<R> getConfig();
71+
72+
/**
73+
* Attempts to acquire a permit to perform an execution against within the bulkhead, waiting until one is available or
74+
* the thread is interrupted. After execution is complete, the permit should be {@link #releasePermit() released} back
75+
* to the bulkhead.
76+
*
77+
* @throws InterruptedException if the current thread is interrupted while waiting to acquire a permit
78+
* @see #tryAcquirePermit()
79+
*/
80+
void acquirePermit() throws InterruptedException;
81+
82+
/**
83+
* Attempts to acquire a permit to perform an execution within the bulkhead, waiting up to the {@code maxWaitTime}
84+
* until one is available, else throwing {@link BulkheadFullException} if a permit will not be available in time.
85+
* After execution is complete, the permit should be {@link #releasePermit() released} back to the bulkhead.
86+
*
87+
* @throws NullPointerException if {@code maxWaitTime} is null
88+
* @throws BulkheadFullException if the bulkhead cannot acquire a permit within the {@code maxWaitTime}
89+
* @throws InterruptedException if the current thread is interrupted while waiting to acquire a permit
90+
* @see #tryAcquirePermit(Duration)
91+
*/
92+
default void acquirePermit(Duration maxWaitTime) throws InterruptedException {
93+
if (!tryAcquirePermit(maxWaitTime))
94+
throw new BulkheadFullException(this);
95+
}
96+
97+
/**
98+
* Tries to acquire a permit to perform an execution within the bulkhead, returning immediately without waiting. After
99+
* execution is complete, the permit should be {@link #releasePermit() released} back to the bulkhead.
100+
*
101+
* @return whether the requested {@code permits} are successfully acquired or not
102+
*/
103+
boolean tryAcquirePermit();
104+
105+
/**
106+
* Tries to acquire a permit to perform an execution within the bulkhead, waiting up to the {@code maxWaitTime} until
107+
* they are available. After execution is complete, the permit should be {@link #releasePermit() released} back to the
108+
* bulkhead.
109+
*
110+
* @return whether a permit is successfully acquired
111+
* @throws NullPointerException if {@code maxWaitTime} is null
112+
* @throws InterruptedException if the current thread is interrupted while waiting to acquire a permit
113+
*/
114+
boolean tryAcquirePermit(Duration maxWaitTime) throws InterruptedException;
115+
116+
/**
117+
* Releases a permit to execute.
118+
*/
119+
void releasePermit();
120+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Copyright 2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License
15+
*/
16+
package dev.failsafe;
17+
18+
import dev.failsafe.internal.BulkheadImpl;
19+
import dev.failsafe.internal.util.Assert;
20+
21+
import java.time.Duration;
22+
23+
/**
24+
* Builds {@link Bulkhead} instances.
25+
* <p>
26+
* This class is <i>not</i> threadsafe.
27+
* </p>
28+
*
29+
* @param <R> result type
30+
* @author Jonathan Halterman
31+
* @see BulkheadConfig
32+
* @see BulkheadFullException
33+
*/
34+
public class BulkheadBuilder<R> extends PolicyBuilder<BulkheadBuilder<R>, BulkheadConfig<R>, R> {
35+
BulkheadBuilder(int maxConcurrency) {
36+
super(new BulkheadConfig<>(maxConcurrency));
37+
}
38+
39+
BulkheadBuilder(BulkheadConfig<R> config) {
40+
super(new BulkheadConfig<>(config));
41+
}
42+
43+
/**
44+
* Builds a new {@link Bulkhead} using the builder's configuration.
45+
*/
46+
public Bulkhead<R> build() {
47+
return new BulkheadImpl<>(new BulkheadConfig<>(config));
48+
}
49+
50+
/**
51+
* Configures the {@code maxWaitTime} to wait for permits to be available. If permits cannot be acquired before the
52+
* {@code maxWaitTime} is exceeded, then the bulkhead will throw {@link BulkheadFullException}.
53+
*
54+
* @throws NullPointerException if {@code maxWaitTime} is null
55+
*/
56+
public BulkheadBuilder<R> withMaxWaitTime(Duration maxWaitTime) {
57+
config.maxWaitTime = Assert.notNull(maxWaitTime, "maxWaitTime");
58+
return this;
59+
}
60+
61+
/**
62+
* Configures the bulkhead to be fair in permitting waiting execution in order.
63+
*/
64+
public BulkheadBuilder<R> withFairness() {
65+
config.fair = true;
66+
return this;
67+
}
68+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Copyright 2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License
15+
*/
16+
package dev.failsafe;
17+
18+
import java.time.Duration;
19+
20+
/**
21+
* Configuration for a {@link Bulkhead}.
22+
*
23+
* @param <R> result type
24+
* @author Jonathan Halterman
25+
*/
26+
public class BulkheadConfig<R> extends PolicyConfig<R> {
27+
int maxConcurrency;
28+
Duration maxWaitTime;
29+
boolean fair;
30+
31+
BulkheadConfig(int maxConcurrency) {
32+
this.maxConcurrency = maxConcurrency;
33+
maxWaitTime = Duration.ZERO;
34+
}
35+
36+
BulkheadConfig(BulkheadConfig<R> config) {
37+
super(config);
38+
maxConcurrency = config.maxConcurrency;
39+
maxWaitTime = config.maxWaitTime;
40+
fair = config.fair;
41+
}
42+
43+
/**
44+
* Returns that max concurrent executions that are permitted within the bulkhead.
45+
*
46+
* @see Bulkhead#builder(int)
47+
*/
48+
public int getMaxConcurrency() {
49+
return maxConcurrency;
50+
}
51+
52+
/**
53+
* Returns the max time to wait for permits to be available. If permits cannot be acquired before the max wait time is
54+
* exceeded, then the bulkhead will throw {@link BulkheadFullException}.
55+
*
56+
* @see BulkheadBuilder#withMaxWaitTime(Duration)
57+
*/
58+
public Duration getMaxWaitTime() {
59+
return maxWaitTime;
60+
}
61+
62+
/**
63+
* Returns whether the Bulkhead is fair in permitting waiting executions in order.
64+
*
65+
* @see BulkheadBuilder#withFairness()
66+
*/
67+
public boolean isFair() {
68+
return fair;
69+
}
70+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright 2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License
15+
*/
16+
package dev.failsafe;
17+
18+
/**
19+
* Thrown when an execution is attempted against a {@link Bulkhead} that is full.
20+
*
21+
* @author Jonathan Halterman
22+
*/
23+
public class BulkheadFullException extends FailsafeException {
24+
private static final long serialVersionUID = 1L;
25+
26+
private final Bulkhead<?> bulkhead;
27+
28+
public BulkheadFullException(Bulkhead<?> bulkhead) {
29+
this.bulkhead = bulkhead;
30+
}
31+
32+
/** Returns the {@link Bulkhead} that caused the exception. */
33+
public Bulkhead<?> getBulkhead() {
34+
return bulkhead;
35+
}
36+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Copyright 2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License
15+
*/
16+
package dev.failsafe.internal;
17+
18+
import dev.failsafe.Bulkhead;
19+
import dev.failsafe.BulkheadFullException;
20+
import dev.failsafe.ExecutionContext;
21+
import dev.failsafe.RateLimitExceededException;
22+
import dev.failsafe.spi.ExecutionResult;
23+
import dev.failsafe.spi.PolicyExecutor;
24+
25+
import java.time.Duration;
26+
27+
/**
28+
* A PolicyExecutor that handles failures according to a {@link Bulkhead}.
29+
*
30+
* @param <R> result type
31+
* @author Jonathan Halterman
32+
*/
33+
public class BulkheadExecutor<R> extends PolicyExecutor<R> {
34+
private final BulkheadImpl<R> bulkhead;
35+
private final Duration maxWaitTime;
36+
37+
public BulkheadExecutor(BulkheadImpl<R> bulkhead, int policyIndex) {
38+
super(bulkhead, policyIndex);
39+
this.bulkhead = bulkhead;
40+
maxWaitTime = bulkhead.getConfig().getMaxWaitTime();
41+
}
42+
43+
@Override
44+
protected ExecutionResult<R> preExecute() {
45+
try {
46+
return bulkhead.tryAcquirePermit(maxWaitTime) ?
47+
null :
48+
ExecutionResult.failure(new BulkheadFullException(bulkhead));
49+
} catch (InterruptedException e) {
50+
// Set interrupt flag
51+
Thread.currentThread().interrupt();
52+
return ExecutionResult.failure(e);
53+
}
54+
}
55+
56+
@Override
57+
public void onSuccess(ExecutionResult<R> result) {
58+
bulkhead.releasePermit();
59+
}
60+
61+
@Override
62+
protected ExecutionResult<R> onFailure(ExecutionContext<R> context, ExecutionResult<R> result) {
63+
bulkhead.releasePermit();
64+
return result;
65+
}
66+
}

0 commit comments

Comments
 (0)