Skip to content

Commit

Permalink
KAFKA-18484 [1/N]; Handle exceptions from deferred events in coordina…
Browse files Browse the repository at this point in the history
…tor (apache#18661)

Guard against the coordinator getting stuck due to deferred events
throwing exceptions.

Reviewers: David Jacot <[email protected]>
  • Loading branch information
squah-confluent authored Jan 22, 2025
1 parent 9da516b commit 5a57473
Show file tree
Hide file tree
Showing 2 changed files with 216 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ public int size() {
* A simple container class to hold all the attributes
* related to a pending batch.
*/
private static class CoordinatorBatch {
private class CoordinatorBatch {
/**
* The base (or first) offset of the batch. If the batch fails
* for any reason, the state machines is rolled back to it.
Expand Down Expand Up @@ -500,9 +500,9 @@ private static class CoordinatorBatch {
final Optional<TimerTask> lingerTimeoutTask;

/**
* The list of deferred events associated with the batch.
* The deferred events associated with the batch.
*/
final List<DeferredEvent> deferredEvents;
final DeferredEventCollection deferredEvents;

/**
* The next offset. This is updated when records
Expand All @@ -527,7 +527,7 @@ private static class CoordinatorBatch {
this.buffer = buffer;
this.builder = builder;
this.lingerTimeoutTask = lingerTimeoutTask;
this.deferredEvents = new ArrayList<>();
this.deferredEvents = new DeferredEventCollection();
}
}

Expand Down Expand Up @@ -806,9 +806,7 @@ private void flushCurrentBatch() {
}

// Add all the pending deferred events to the deferred event queue.
for (DeferredEvent event : currentBatch.deferredEvents) {
deferredEventQueue.add(offset, event);
}
deferredEventQueue.add(offset, currentBatch.deferredEvents);

// Free up the current batch.
freeCurrentBatch();
Expand Down Expand Up @@ -839,9 +837,7 @@ private void maybeFlushCurrentBatch(long currentTimeMs) {
private void failCurrentBatch(Throwable t) {
if (currentBatch != null) {
coordinator.revertLastWrittenOffset(currentBatch.baseOffset);
for (DeferredEvent event : currentBatch.deferredEvents) {
event.complete(t);
}
currentBatch.deferredEvents.complete(t);
freeCurrentBatch();
}
}
Expand Down Expand Up @@ -1157,6 +1153,38 @@ public void run() {
}
}

/**
* A collection of {@link DeferredEvent}. When completed, completes all the events in the collection
* and logs any exceptions thrown.
*/
class DeferredEventCollection implements DeferredEvent {
private final List<DeferredEvent> events = new ArrayList<>();

@Override
public void complete(Throwable t) {
for (DeferredEvent event : events) {
try {
event.complete(t);
} catch (Throwable e) {
log.error("Completion of event {} failed due to {}.", event, e.getMessage(), e);
}
}
}

public boolean add(DeferredEvent event) {
return events.add(event);
}

public int size() {
return events.size();
}

@Override
public String toString() {
return "DeferredEventCollection(events=" + events + ")";
}
}

/**
* A coordinator write operation.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,10 @@
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -1116,6 +1118,105 @@ public void testScheduleUnloadingWithStalePartitionEpoch() {
assertEquals(10, ctx.epoch);
}

@Test
public void testScheduleUnloadingWithDeferredEventExceptions() throws ExecutionException, InterruptedException, TimeoutException {
MockTimer timer = new MockTimer();
MockPartitionWriter writer = new MockPartitionWriter();
MockCoordinatorShardBuilderSupplier supplier = mock(MockCoordinatorShardBuilderSupplier.class);
MockCoordinatorShardBuilder builder = mock(MockCoordinatorShardBuilder.class);
MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class);
CoordinatorRuntimeMetrics metrics = mock(CoordinatorRuntimeMetrics.class);

// All operations will throw an exception when completed.
doThrow(new KafkaException("error")).when(metrics).recordEventPurgatoryTime(anyLong());

CoordinatorRuntime<MockCoordinatorShard, String> runtime =
new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
.withTime(timer.time())
.withTimer(timer)
.withDefaultWriteTimeOut(Duration.ofMillis(20))
.withLoader(new MockCoordinatorLoader())
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(writer)
.withCoordinatorShardBuilderSupplier(supplier)
.withCoordinatorRuntimeMetrics(metrics)
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
.withSerializer(new StringSerializer())
.withAppendLingerMs(10)
.withExecutorService(mock(ExecutorService.class))
.build();

when(builder.withSnapshotRegistry(any())).thenReturn(builder);
when(builder.withLogContext(any())).thenReturn(builder);
when(builder.withTime(any())).thenReturn(builder);
when(builder.withTimer(any())).thenReturn(builder);
when(builder.withCoordinatorMetrics(any())).thenReturn(builder);
when(builder.withTopicPartition(any())).thenReturn(builder);
when(builder.withExecutor(any())).thenReturn(builder);
when(builder.build()).thenReturn(coordinator);
when(supplier.get()).thenReturn(builder);

// Load the coordinator.
runtime.scheduleLoadOperation(TP, 10);
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);

// Get the max batch size.
int maxBatchSize = writer.config(TP).maxMessageSize();

// Create records with three quarters of the max batch size each, so that it is not
// possible to have more than one record in a single batch.
List<String> records = Stream.of('1', '2', '3').map(c -> {
char[] payload = new char[maxBatchSize * 3 / 4];
Arrays.fill(payload, c);
return new String(payload);
}).collect(Collectors.toList());

// Write #1.
CompletableFuture<String> write1 = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
state -> new CoordinatorResult<>(List.of(records.get(0)), "response1")
);

// Write #2.
CompletableFuture<String> write2 = runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
state -> new CoordinatorResult<>(List.of(records.get(1)), "response2")
);

// Write #3, to force the flush of write #2.
CompletableFuture<String> write3 = runtime.scheduleWriteOperation("write#3", TP, Duration.ofMillis(20),
state -> new CoordinatorResult<>(List.of(records.get(1)), "response3")
);

// Records have been written to the log.
assertEquals(List.of(
records(timer.time().milliseconds(), records.get(0)),
records(timer.time().milliseconds(), records.get(1))
), writer.entries(TP));

// Verify that no writes are committed yet.
assertFalse(write1.isDone());
assertFalse(write2.isDone());
assertFalse(write3.isDone());

// Schedule the unloading.
runtime.scheduleUnloadOperation(TP, OptionalInt.of(ctx.epoch + 1));
assertEquals(CLOSED, ctx.state);

// All write completions throw exceptions after completing their futures.
// Despite the exceptions, the unload should still complete.
assertTrue(write1.isDone());
assertTrue(write2.isDone());
assertTrue(write3.isDone());
assertFutureThrows(write1, NotCoordinatorException.class);
assertFutureThrows(write2, NotCoordinatorException.class);
assertFutureThrows(write3, NotCoordinatorException.class);

// Verify that onUnloaded is called.
verify(coordinator, times(1)).onUnloaded();

// Getting the coordinator context fails because it no longer exists.
assertThrows(NotCoordinatorException.class, () -> runtime.contextOrThrow(TP));
}

@Test
public void testScheduleWriteOp() throws ExecutionException, InterruptedException, TimeoutException {
MockTimer timer = new MockTimer();
Expand Down Expand Up @@ -3080,6 +3181,83 @@ public void testHighWatermarkUpdate() {
assertTrue(write2.isDone());
}

@Test
public void testHighWatermarkUpdateWithDeferredEventExceptions() throws ExecutionException, InterruptedException, TimeoutException {
MockTimer timer = new MockTimer();
MockPartitionWriter writer = new MockPartitionWriter();
CoordinatorRuntimeMetrics metrics = mock(CoordinatorRuntimeMetrics.class);

// All operations will throw an exception when completed.
doThrow(new KafkaException("error")).when(metrics).recordEventPurgatoryTime(anyLong());

CoordinatorRuntime<MockCoordinatorShard, String> runtime =
new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
.withTime(timer.time())
.withTimer(timer)
.withDefaultWriteTimeOut(Duration.ofMillis(20))
.withLoader(new MockCoordinatorLoader())
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(writer)
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
.withCoordinatorRuntimeMetrics(metrics)
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
.withSerializer(new StringSerializer())
.withAppendLingerMs(10)
.withExecutorService(mock(ExecutorService.class))
.build();

// Load the coordinator.
runtime.scheduleLoadOperation(TP, 10);

// Get the max batch size.
int maxBatchSize = writer.config(TP).maxMessageSize();

// Create records with three quarters of the max batch size each, so that it is not
// possible to have more than one record in a single batch.
List<String> records = Stream.of('1', '2', '3').map(c -> {
char[] payload = new char[maxBatchSize * 3 / 4];
Arrays.fill(payload, c);
return new String(payload);
}).collect(Collectors.toList());

// Write #1.
CompletableFuture<String> write1 = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
state -> new CoordinatorResult<>(List.of(records.get(0)), "response1")
);

// Write #2.
CompletableFuture<String> write2 = runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
state -> new CoordinatorResult<>(List.of(records.get(1)), "response2")
);

// Write #3, to force the flush of write #2.
CompletableFuture<String> write3 = runtime.scheduleWriteOperation("write#3", TP, Duration.ofMillis(20),
state -> new CoordinatorResult<>(List.of(records.get(1)), "response3")
);

// Records have been written to the log.
assertEquals(List.of(
records(timer.time().milliseconds(), records.get(0)),
records(timer.time().milliseconds(), records.get(1))
), writer.entries(TP));

// Verify that no writes are committed yet.
assertFalse(write1.isDone());
assertFalse(write2.isDone());
assertFalse(write3.isDone());

// Commit the first and second record.
writer.commit(TP, 2);

// Write #1 and write #2's completions throw exceptions after completing their futures.
// Despite the exception from write #1, write #2 should still be completed.
assertTrue(write1.isDone());
assertTrue(write2.isDone());
assertFalse(write3.isDone());
assertEquals("response1", write1.get(5, TimeUnit.SECONDS));
assertEquals("response2", write2.get(5, TimeUnit.SECONDS));
}

@Test
public void testWriteEventWriteTimeoutTaskIsCancelledWhenHighWatermarkIsUpdated() {
MockTimer timer = new MockTimer();
Expand Down

0 comments on commit 5a57473

Please sign in to comment.