From 0465677ae84c0a4ffe669a217bb0010dcdcfac07 Mon Sep 17 00:00:00 2001 From: abhishek-Jain24 Date: Fri, 17 Oct 2025 17:54:46 +0530 Subject: [PATCH] Add Bulkhead pattern implementation and demo Introduces Bulkhead pattern classes including configuration, metrics, and service bulkhead logic. Adds a demo application, test class, and documentation to illustrate usage and benefits of isolating concurrent executions to prevent cascading failures. --- bulkhead/App.java | 39 +++++++++++++++++++++++++ bulkhead/BulkheadConfig.java | 15 ++++++++++ bulkhead/BulkheadMetrics.java | 20 +++++++++++++ bulkhead/BulkheadPattern.java | 25 ++++++++++++++++ bulkhead/README.md | 20 +++++++++++++ bulkhead/ServiceBulkhead.java | 55 +++++++++++++++++++++++++++++++++++ bulkhead/TestBulkhead.java | 25 ++++++++++++++++ 7 files changed, 199 insertions(+) create mode 100644 bulkhead/App.java create mode 100644 bulkhead/BulkheadConfig.java create mode 100644 bulkhead/BulkheadMetrics.java create mode 100644 bulkhead/BulkheadPattern.java create mode 100644 bulkhead/README.md create mode 100644 bulkhead/ServiceBulkhead.java create mode 100644 bulkhead/TestBulkhead.java diff --git a/bulkhead/App.java b/bulkhead/App.java new file mode 100644 index 000000000000..10c34b2637ef --- /dev/null +++ b/bulkhead/App.java @@ -0,0 +1,39 @@ +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** + * Demonstrates Bulkhead Pattern + */ +public class App { + + public static void main(String[] args) throws InterruptedException { + System.out.println("=== Bulkhead Pattern Demo ==="); + + // Create bulkhead with only 2 concurrent calls + BulkheadConfig config = new BulkheadConfig(2, 1000); + ServiceBulkhead bulkhead = BulkheadPattern.getBulkhead("PaymentService", config); + + // Simulate multiple service calls + List> futures = new ArrayList<>(); + + for (int i = 1; i <= 5; i++) { + final int taskId = i; + CompletableFuture future = bulkhead.execute(() -> { + System.out.println("Task " + taskId + " started - Available permits: " + + bulkhead.getAvailablePermits()); + Thread.sleep(2000); // Simulate work + System.out.println("Task " + taskId + " completed"); + return "Result-" + taskId; + }); + + futures.add(future); + } + + // Wait for completion + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); + System.out.println("\nFinal metrics: " + bulkhead.getMetrics()); + + BulkheadPattern.shutdownAll(); + } +} \ No newline at end of file diff --git a/bulkhead/BulkheadConfig.java b/bulkhead/BulkheadConfig.java new file mode 100644 index 000000000000..de70a2c5fd2d --- /dev/null +++ b/bulkhead/BulkheadConfig.java @@ -0,0 +1,15 @@ +/** + * Configuration for Bulkhead pattern + */ +public class BulkheadConfig { + private final int maxConcurrentCalls; + private final long maxWaitTimeMs; + + public BulkheadConfig(int maxConcurrentCalls, long maxWaitTimeMs) { + this.maxConcurrentCalls = maxConcurrentCalls; + this.maxWaitTimeMs = maxWaitTimeMs; + } + + public int getMaxConcurrentCalls() { return maxConcurrentCalls; } + public long getMaxWaitTimeMs() { return maxWaitTimeMs; } +} \ No newline at end of file diff --git a/bulkhead/BulkheadMetrics.java b/bulkhead/BulkheadMetrics.java new file mode 100644 index 000000000000..23e9b55ddcc4 --- /dev/null +++ b/bulkhead/BulkheadMetrics.java @@ -0,0 +1,20 @@ +import java.util.concurrent.atomic.AtomicLong; + +/** + * Tracks bulkhead performance metrics + */ +public class BulkheadMetrics { + private final AtomicLong successfulCalls = new AtomicLong(); + private final AtomicLong failedCalls = new AtomicLong(); + private final AtomicLong rejectedCalls = new AtomicLong(); + + public void recordSuccessfulCall() { successfulCalls.incrementAndGet(); } + public void recordFailedCall() { failedCalls.incrementAndGet(); } + public void recordRejectedCall() { rejectedCalls.incrementAndGet(); } + + @Override + public String toString() { + return String.format("Metrics[successful=%d, failed=%d, rejected=%d]", + successfulCalls.get(), failedCalls.get(), rejectedCalls.get()); + } +} \ No newline at end of file diff --git a/bulkhead/BulkheadPattern.java b/bulkhead/BulkheadPattern.java new file mode 100644 index 000000000000..707175dc3906 --- /dev/null +++ b/bulkhead/BulkheadPattern.java @@ -0,0 +1,25 @@ +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Bulkhead Pattern - isolates application elements to prevent cascading failures + * + *

Inspired by ship bulkheads that prevent water from flooding entire vessel. + * Limits concurrent executions to protect system resources.

+ */ +public class BulkheadPattern { + private static final Map BULKHEADS = new ConcurrentHashMap<>(); + + public static ServiceBulkhead getBulkhead(String name, BulkheadConfig config) { + return BULKHEADS.computeIfAbsent(name, k -> new ServiceBulkhead(name, config)); + } + + public static ServiceBulkhead getBulkhead(String name) { + return getBulkhead(name, new BulkheadConfig(10, 1000)); + } + + public static void shutdownAll() { + BULKHEADS.values().forEach(ServiceBulkhead::shutdown); + BULKHEADS.clear(); + } +} \ No newline at end of file diff --git a/bulkhead/README.md b/bulkhead/README.md new file mode 100644 index 000000000000..d6820cf289ea --- /dev/null +++ b/bulkhead/README.md @@ -0,0 +1,20 @@ +# Bulkhead Pattern + +## Intent + +Isolate elements of an application into pools so that if one fails, others continue to function. Prevents cascading failures in distributed systems. + +## Explanation + +Similar to ship bulkheads that prevent flooding, this pattern limits concurrent executions to protect system resources. When a service becomes overloaded, the bulkhead contains the failure without affecting other services. + +## Usage + +```java +BulkheadConfig config = new BulkheadConfig(3, 1000); +ServiceBulkhead bulkhead = BulkheadPattern.getBulkhead("PaymentService", config); + +CompletableFuture result = bulkhead.execute(() -> { + // Your service call + return processPayment(); +}); diff --git a/bulkhead/ServiceBulkhead.java b/bulkhead/ServiceBulkhead.java new file mode 100644 index 000000000000..fa4d3f2058c6 --- /dev/null +++ b/bulkhead/ServiceBulkhead.java @@ -0,0 +1,55 @@ +import java.util.concurrent.*; + +/** + * Implements Bulkhead pattern to limit concurrent executions + */ +public class ServiceBulkhead { + private final Semaphore semaphore; + private final BulkheadConfig config; + private final String name; + private final BulkheadMetrics metrics; + private final ExecutorService executor; + + public ServiceBulkhead(String name, BulkheadConfig config) { + this.name = name; + this.config = config; + this.semaphore = new Semaphore(config.getMaxConcurrentCalls()); + this.metrics = new BulkheadMetrics(); + this.executor = Executors.newCachedThreadPool(); + } + + public CompletableFuture execute(Callable task) { + try { + if (!semaphore.tryAcquire(config.getMaxWaitTimeMs(), TimeUnit.MILLISECONDS)) { + metrics.recordRejectedCall(); + throw new RuntimeException("Bulkhead '" + name + "' timeout after " + + config.getMaxWaitTimeMs() + "ms"); + } + + return CompletableFuture.supplyAsync(() -> { + try { + T result = task.call(); + metrics.recordSuccessfulCall(); + return result; + } catch (Exception e) { + metrics.recordFailedCall(); + throw new CompletionException(e); + } finally { + semaphore.release(); + } + }, executor); + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + metrics.recordRejectedCall(); + throw new RuntimeException("Bulkhead acquisition interrupted", e); + } + } + + public BulkheadMetrics getMetrics() { return metrics; } + public int getAvailablePermits() { return semaphore.availablePermits(); } + + public void shutdown() { + executor.shutdown(); + } +} \ No newline at end of file diff --git a/bulkhead/TestBulkhead.java b/bulkhead/TestBulkhead.java new file mode 100644 index 000000000000..6bd84e811eef --- /dev/null +++ b/bulkhead/TestBulkhead.java @@ -0,0 +1,25 @@ +import java.util.concurrent.CompletableFuture; + +public class TestBulkhead { + public static void main(String[] args) throws InterruptedException { + System.out.println("Testing Bulkhead Pattern..."); + + // Test 1: Basic functionality + BulkheadConfig config = new BulkheadConfig(2, 1000); + ServiceBulkhead bulkhead = BulkheadPattern.getBulkhead("TestService", config); + + // Execute a simple task + CompletableFuture future = bulkhead.execute(() -> { + Thread.sleep(500); + return "Success!"; + }); + + future.thenAccept(result -> System.out.println("Result: " + result)); + + Thread.sleep(1000); + System.out.println("Metrics: " + bulkhead.getMetrics()); + + bulkhead.shutdown(); + System.out.println("✅ Basic test passed!"); + } +} \ No newline at end of file