logOpt = partitionLogSupplier.apply(tp);
+ if (logOpt.isEmpty()) {
+ future.completeExceptionally(new NotLeaderOrFollowerException(
+ "Could not load records from " + tp + " because the log does not exist."));
+ return;
+ }
+
+ UnifiedLog log = logOpt.get();
+
+ // Buffer may not be needed if records are read from memory.
+ ByteBuffer buffer = ByteBuffer.allocate(0);
+ long currentOffset = log.logStartOffset();
+ LoadStats stats = new LoadStats();
+
+ long previousHighWatermark = -1L;
+ while (shouldFetchNextBatch(currentOffset, logEndOffset(tp), stats.readAtLeastOneRecord)) {
+ FetchDataInfo fetchDataInfo = log.read(currentOffset, loadBufferSize, FetchIsolation.LOG_END, true);
+
+ stats.readAtLeastOneRecord = fetchDataInfo.records.sizeInBytes() > 0;
+
+ MemoryRecords memoryRecords = toReadableMemoryRecords(tp, fetchDataInfo.records, buffer);
+ if (fetchDataInfo.records instanceof FileRecords) {
+ buffer = memoryRecords.buffer();
+ }
+
+ ReplayResult replayResult = processMemoryRecords(tp, log, memoryRecords, coordinator, stats, currentOffset, previousHighWatermark);
+ currentOffset = replayResult.nextOffset;
+ previousHighWatermark = replayResult.highWatermark;
+ }
+
+ long endTimeMs = time.milliseconds();
+
+ if (logEndOffset(tp) == -1L) {
+ future.completeExceptionally(new NotLeaderOrFollowerException(
+ String.format("Stopped loading records from %s because the partition is not online or is no longer the leader.", tp)
+ ));
+ } else if (isRunning.get()) {
+ future.complete(new LoadSummary(startTimeMs, endTimeMs, schedulerQueueTimeMs, stats.numRecords, stats.numBytes));
+ } else {
+ future.completeExceptionally(new RuntimeException("Coordinator loader is closed."));
+ }
+ } catch (Throwable ex) {
+ future.completeExceptionally(ex);
+ }
+ }
+
+ private long logEndOffset(TopicPartition tp) {
+ return partitionLogEndOffsetSupplier.apply(tp).orElse(-1L);
+ }
+
+ /**
+ * Returns true if it's still valid to fetch the next batch of records.
+ *
+ * This method ensures fetching continues only under safe and meaningful conditions:
+ *
+ * - The current offset is less than the log end offset.
+ * - At least one record was read in the previous fetch. This ensures that fetching stops even if the
+ * current offset remains smaller than the log end offset but the log is empty. This could happen with compacted topics.
+ * - The log end offset is not -1L, which ensures the partition is online and is still the leader.
+ * - The loader is still running.
+ *
+ */
+ private boolean shouldFetchNextBatch(long currentOffset, long logEndOffset, boolean readAtLeastOneRecord) {
+ return currentOffset < logEndOffset && readAtLeastOneRecord && isRunning.get();
+ }
+
+ private MemoryRecords toReadableMemoryRecords(TopicPartition tp, Records records, ByteBuffer buffer) throws IOException {
+ if (records instanceof MemoryRecords memoryRecords) {
+ return memoryRecords;
+ } else if (records instanceof FileRecords fileRecords) {
+ int sizeInBytes = fileRecords.sizeInBytes();
+ int bytesNeeded = Math.max(loadBufferSize, sizeInBytes);
+
+ // "minOneMessage = true in the above log.read() means that the buffer may need to
+ // be grown to ensure progress can be made.
+ if (buffer.capacity() < bytesNeeded) {
+ if (loadBufferSize < bytesNeeded) {
+ LOG.warn("Loaded metadata from {} with buffer larger ({} bytes) than" +
+ " configured buffer size ({} bytes).", tp, bytesNeeded, loadBufferSize);
+ }
+
+ buffer = ByteBuffer.allocate(bytesNeeded);
+ } else {
+ buffer.clear();
+ }
+
+ fileRecords.readInto(buffer, 0);
+ return MemoryRecords.readableRecords(buffer);
+ } else {
+ throw new IllegalArgumentException("Unsupported record type: " + records.getClass());
+ }
+ }
+
+ private ReplayResult processMemoryRecords(
+ TopicPartition tp,
+ UnifiedLog log,
+ MemoryRecords memoryRecords,
+ CoordinatorPlayback coordinator,
+ LoadStats loadStats,
+ long currentOffset,
+ long previousHighWatermark
+ ) {
+
+ for (MutableRecordBatch batch : memoryRecords.batches()) {
+ if (batch.isControlBatch()) {
+ for (Record record : batch) {
+ ControlRecordType controlRecord = ControlRecordType.parse(record.key());
+ if (controlRecord == ControlRecordType.COMMIT) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Replaying end transaction marker from {} at offset {} to commit" +
+ " transaction with producer id {} and producer epoch {}.",
+ tp, record.offset(), batch.producerId(), batch.producerEpoch());
+ }
+ coordinator.replayEndTransactionMarker(
+ batch.producerId(),
+ batch.producerEpoch(),
+ TransactionResult.COMMIT
+ );
+ } else if (controlRecord == ControlRecordType.ABORT) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Replaying end transaction marker from {} at offset {} to abort" +
+ " transaction with producer id {} and producer epoch {}.",
+ tp, record.offset(), batch.producerId(), batch.producerEpoch());
+ }
+ coordinator.replayEndTransactionMarker(
+ batch.producerId(),
+ batch.producerEpoch(),
+ TransactionResult.ABORT
+ );
+ }
+ }
+ } else {
+ for (Record record : batch) {
+ loadStats.numRecords++;
+
+ Optional coordinatorRecordOpt = Optional.empty();
+ try {
+ coordinatorRecordOpt = Optional.ofNullable(deserializer.deserialize(record.key(), record.value()));
+ } catch (Deserializer.UnknownRecordTypeException ex) {
+ LOG.warn("Unknown record type {} while loading offsets and group metadata from {}." +
+ " Ignoring it. It could be a left over from an aborted upgrade.", ex.unknownType(), tp);
+ } catch (RuntimeException ex) {
+ String msg = String.format("Deserializing record %s from %s failed due to: %s", record, tp, ex.getMessage());
+ LOG.error(msg);
+ throw new RuntimeException(msg, ex);
+ }
+
+ coordinatorRecordOpt.ifPresent(coordinatorRecord -> {
+ try {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Replaying record {} from {} at offset {} with producer id {}" +
+ " and producer epoch {}.", coordinatorRecord, tp, record.offset(), batch.producerId(), batch.producerEpoch());
+ }
+ coordinator.replay(
+ record.offset(),
+ batch.producerId(),
+ batch.producerEpoch(),
+ coordinatorRecord
+ );
+ } catch (RuntimeException ex) {
+ String msg = String.format("Replaying record %s from %s at offset %d with producer id %d and" +
+ " producer epoch %d failed due to: %s.", coordinatorRecord, tp, record.offset(),
+ batch.producerId(), batch.producerEpoch(), ex.getMessage());
+ LOG.error(msg);
+ throw new RuntimeException(msg, ex);
+ }
+ });
+ }
+ }
+
+ // Note that the high watermark can be greater than the current offset but as we load more records
+ // the current offset will eventually surpass the high watermark. Also note that the high watermark
+ // will continue to advance while loading.
+ currentOffset = batch.nextOffset();
+ long currentHighWatermark = log.highWatermark();
+ if (currentOffset >= currentHighWatermark) {
+ coordinator.updateLastWrittenOffset(currentOffset);
+
+ if (currentHighWatermark > previousHighWatermark) {
+ coordinator.updateLastCommittedOffset(currentHighWatermark);
+ previousHighWatermark = currentHighWatermark;
+ }
+ }
+ }
+ loadStats.numBytes += memoryRecords.sizeInBytes();
+ return new ReplayResult(currentOffset, previousHighWatermark);
+ }
+
+ /**
+ * Closes the loader.
+ */
+ @Override
+ public void close() throws Exception {
+ if (!isRunning.compareAndSet(true, false)) {
+ LOG.warn("Coordinator loader is already shutting down.");
+ return;
+ }
+ scheduler.shutdown();
+ }
+
+ /**
+ * A helper class to track key metrics during the data loading operation.
+ */
+ private static class LoadStats {
+ private long numRecords = 0L;
+ private long numBytes = 0L;
+ private boolean readAtLeastOneRecord = true;
+
+ @Override
+ public String toString() {
+ return "LoadStats{" +
+ "numRecords=" + numRecords +
+ ", numBytes=" + numBytes +
+ ", readAtLeastOneRecord=" + readAtLeastOneRecord +
+ '}';
+ }
+ }
+
+ private record ReplayResult(long nextOffset, long highWatermark) {
+ }
+}
diff --git a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImplTest.java b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImplTest.java
new file mode 100644
index 0000000000000..0c375bce2d1d5
--- /dev/null
+++ b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImplTest.java
@@ -0,0 +1,706 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.common.runtime;
+
+import kafka.server.ReplicaManager;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.EndTransactionMarker;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.requests.TransactionResult;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.storage.log.FetchIsolation;
+import org.apache.kafka.storage.internals.log.FetchDataInfo;
+import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
+import org.apache.kafka.storage.internals.log.UnifiedLog;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.mockito.ArgumentCaptor;
+import org.mockito.ArgumentMatchers;
+import org.mockito.invocation.InvocationOnMock;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import scala.Option;
+
+import static org.apache.kafka.test.TestUtils.assertFutureThrows;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static scala.jdk.javaapi.OptionConverters.toJava;
+import static scala.jdk.javaapi.OptionConverters.toScala;
+
+@SuppressWarnings("unchecked")
+@Timeout(60)
+class CoordinatorLoaderImplTest {
+
+ private record Tuple(K key, V value) {
+ }
+
+ private static class StringKeyValueDeserializer implements Deserializer> {
+
+ @Override
+ public Tuple deserialize(ByteBuffer key, ByteBuffer value) throws RuntimeException {
+ return new Tuple<>(
+ StandardCharsets.UTF_8.decode(key).toString(),
+ StandardCharsets.UTF_8.decode(value).toString()
+ );
+ }
+ }
+
+ @Test
+ void testNonexistentPartition() throws Exception {
+ TopicPartition tp = new TopicPartition("foo", 0);
+ ReplicaManager replicaManager = mock(ReplicaManager.class);
+ Function> partitionLogSupplier = partition -> toJava(replicaManager.getLog(partition));
+ Function> partitionLogEndOffsetSupplier = partition -> Optional.ofNullable((Long) replicaManager.getLogEndOffset(partition).get());
+ Deserializer> serde = mock(Deserializer.class);
+ CoordinatorPlayback> coordinator = mock(CoordinatorPlayback.class);
+
+ try (CoordinatorLoaderImpl> loader = new CoordinatorLoaderImpl<>(
+ Time.SYSTEM,
+ partitionLogSupplier,
+ partitionLogEndOffsetSupplier,
+ serde,
+ 1000
+ )) {
+ when(replicaManager.getLog(tp)).thenReturn(Option.empty());
+
+ assertFutureThrows(NotLeaderOrFollowerException.class, loader.load(tp, coordinator));
+ }
+ }
+
+ @Test
+ void testLoadingIsRejectedWhenClosed() throws Exception {
+ TopicPartition tp = new TopicPartition("foo", 0);
+ ReplicaManager replicaManager = mock(ReplicaManager.class);
+ Function> partitionLogSupplier = partition -> toJava(replicaManager.getLog(partition));
+ Function> partitionLogEndOffsetSupplier = partition -> Optional.ofNullable((Long) replicaManager.getLogEndOffset(partition).get());
+ Deserializer> serde = mock(Deserializer.class);
+ CoordinatorPlayback> coordinator = mock(CoordinatorPlayback.class);
+
+ try (CoordinatorLoaderImpl> loader = new CoordinatorLoaderImpl<>(
+ Time.SYSTEM,
+ partitionLogSupplier,
+ partitionLogEndOffsetSupplier,
+ serde,
+ 1000
+ )) {
+ loader.close();
+ assertFutureThrows(RuntimeException.class, loader.load(tp, coordinator));
+ }
+ }
+
+ @Test
+ void testLoading() throws Exception {
+ TopicPartition tp = new TopicPartition("foo", 0);
+ ReplicaManager replicaManager = mock(ReplicaManager.class);
+ Function> partitionLogSupplier = partition -> toJava(replicaManager.getLog(partition));
+ Function> partitionLogEndOffsetSupplier = partition -> Optional.ofNullable((Long) replicaManager.getLogEndOffset(partition).get());
+ Deserializer> serde = new StringKeyValueDeserializer();
+ UnifiedLog log = mock(UnifiedLog.class);
+ CoordinatorPlayback> coordinator = mock(CoordinatorPlayback.class);
+
+ try (CoordinatorLoaderImpl> loader = new CoordinatorLoaderImpl<>(
+ Time.SYSTEM,
+ partitionLogSupplier,
+ partitionLogEndOffsetSupplier,
+ serde,
+ 1000
+ )) {
+ when(replicaManager.getLog(tp)).thenReturn(toScala(Optional.of(log)));
+ when(log.logStartOffset()).thenReturn(0L);
+ when(replicaManager.getLogEndOffset(tp)).thenReturn(toScala(Optional.of(9L)));
+ when(log.highWatermark()).thenReturn(0L);
+
+ FetchDataInfo readResult1 = logReadResult(0, Arrays.asList(
+ new SimpleRecord("k1".getBytes(), "v1".getBytes()),
+ new SimpleRecord("k2".getBytes(), "v2".getBytes())
+ ));
+
+ when(log.read(0L, 1000, FetchIsolation.LOG_END, true))
+ .thenReturn(readResult1);
+
+ FetchDataInfo readResult2 = logReadResult(2, Arrays.asList(
+ new SimpleRecord("k3".getBytes(), "v3".getBytes()),
+ new SimpleRecord("k4".getBytes(), "v4".getBytes()),
+ new SimpleRecord("k5".getBytes(), "v5".getBytes())
+ ));
+
+ when(log.read(2L, 1000, FetchIsolation.LOG_END, true))
+ .thenReturn(readResult2);
+
+ FetchDataInfo readResult3 = logReadResult(5, 100L, (short) 5, Arrays.asList(
+ new SimpleRecord("k6".getBytes(), "v6".getBytes()),
+ new SimpleRecord("k7".getBytes(), "v7".getBytes())
+ ));
+
+ when(log.read(5L, 1000, FetchIsolation.LOG_END, true))
+ .thenReturn(readResult3);
+
+ FetchDataInfo readResult4 = logReadResult(
+ 7,
+ 100L,
+ (short) 5,
+ ControlRecordType.COMMIT
+ );
+
+ when(log.read(7L, 1000, FetchIsolation.LOG_END, true))
+ .thenReturn(readResult4);
+
+ FetchDataInfo readResult5 = logReadResult(
+ 8,
+ 500L,
+ (short) 10,
+ ControlRecordType.ABORT
+ );
+
+ when(log.read(8L, 1000, FetchIsolation.LOG_END, true))
+ .thenReturn(readResult5);
+
+ assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS));
+
+ verify(coordinator).replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k1", "v1"));
+ verify(coordinator).replay(1L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k2", "v2"));
+ verify(coordinator).replay(2L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k3", "v3"));
+ verify(coordinator).replay(3L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k4", "v4"));
+ verify(coordinator).replay(4L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k5", "v5"));
+ verify(coordinator).replay(5L, 100L, (short) 5, new Tuple<>("k6", "v6"));
+ verify(coordinator).replay(6L, 100L, (short) 5, new Tuple<>("k7", "v7"));
+ verify(coordinator).replayEndTransactionMarker(100L, (short) 5, TransactionResult.COMMIT);
+ verify(coordinator).replayEndTransactionMarker(500L, (short) 10, TransactionResult.ABORT);
+ verify(coordinator).updateLastWrittenOffset(2L);
+ verify(coordinator).updateLastWrittenOffset(5L);
+ verify(coordinator).updateLastWrittenOffset(7L);
+ verify(coordinator).updateLastWrittenOffset(8L);
+ verify(coordinator).updateLastCommittedOffset(0L);
+ }
+ }
+
+ @Test
+ void testLoadingStoppedWhenClosed() throws Exception {
+ TopicPartition tp = new TopicPartition("foo", 0);
+ ReplicaManager replicaManager = mock(ReplicaManager.class);
+ Function> partitionLogSupplier = partition -> toJava(replicaManager.getLog(partition));
+ Function> partitionLogEndOffsetSupplier = partition -> Optional.ofNullable((Long) replicaManager.getLogEndOffset(partition).get());
+ Deserializer> serde = new StringKeyValueDeserializer();
+ UnifiedLog log = mock(UnifiedLog.class);
+ CoordinatorPlayback> coordinator = mock(CoordinatorPlayback.class);
+
+ try (CoordinatorLoaderImpl> loader = new CoordinatorLoaderImpl<>(
+ Time.SYSTEM,
+ partitionLogSupplier,
+ partitionLogEndOffsetSupplier,
+ serde,
+ 1000
+ )) {
+ when(replicaManager.getLog(tp)).thenReturn(toScala(Optional.of(log)));
+ when(log.logStartOffset()).thenReturn(0L);
+ when(replicaManager.getLogEndOffset(tp)).thenReturn(toScala(Optional.of(100L)));
+
+ FetchDataInfo readResult = logReadResult(0, Arrays.asList(
+ new SimpleRecord("k1".getBytes(), "v1".getBytes()),
+ new SimpleRecord("k2".getBytes(), "v2".getBytes())
+ ));
+
+ CountDownLatch latch = new CountDownLatch(1);
+ when(log.read(
+ anyLong(),
+ eq(1000),
+ eq(FetchIsolation.LOG_END),
+ eq(true)
+ )).thenAnswer((InvocationOnMock invocation) -> {
+ latch.countDown();
+ return readResult;
+ });
+
+ CompletableFuture result = loader.load(tp, coordinator);
+ boolean completed = latch.await(10, TimeUnit.SECONDS);
+ assertTrue(completed, "Log read timeout: Latch did not count down in time.");
+ loader.close();
+
+ RuntimeException ex = assertFutureThrows(RuntimeException.class, result);
+ assertNotNull(ex);
+ assertEquals("Coordinator loader is closed.", ex.getMessage());
+ }
+ }
+
+ @Test
+ void testUnknownRecordTypeAreIgnored() throws Exception {
+ TopicPartition tp = new TopicPartition("foo", 0);
+ ReplicaManager replicaManager = mock(ReplicaManager.class);
+ Function> partitionLogSupplier = partition -> toJava(replicaManager.getLog(partition));
+ Function> partitionLogEndOffsetSupplier = partition -> Optional.ofNullable((Long) replicaManager.getLogEndOffset(partition).get());
+ StringKeyValueDeserializer serde = mock(StringKeyValueDeserializer.class);
+ UnifiedLog log = mock(UnifiedLog.class);
+ CoordinatorPlayback> coordinator = mock(CoordinatorPlayback.class);
+
+ try (CoordinatorLoaderImpl> loader = new CoordinatorLoaderImpl<>(
+ Time.SYSTEM,
+ partitionLogSupplier,
+ partitionLogEndOffsetSupplier,
+ serde,
+ 1000
+ )) {
+ when(replicaManager.getLog(tp)).thenReturn(toScala(Optional.of(log)));
+ when(log.logStartOffset()).thenReturn(0L);
+ when(replicaManager.getLogEndOffset(tp)).thenReturn(toScala(Optional.of(2L)));
+
+ FetchDataInfo readResult = logReadResult(0, Arrays.asList(
+ new SimpleRecord("k1".getBytes(), "v1".getBytes()),
+ new SimpleRecord("k2".getBytes(), "v2".getBytes())
+ ));
+
+ when(log.read(0L, 1000, FetchIsolation.LOG_END, true))
+ .thenReturn(readResult);
+
+ when(serde.deserialize(any(ByteBuffer.class), any(ByteBuffer.class)))
+ .thenThrow(new Deserializer.UnknownRecordTypeException((short) 1))
+ .thenReturn(new Tuple<>("k2", "v2"));
+
+ loader.load(tp, coordinator).get(10, TimeUnit.SECONDS);
+
+ verify(coordinator).replay(1L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k2", "v2"));
+ }
+ }
+
+ @Test
+ void testDeserializationErrorFailsTheLoading() throws Exception {
+ TopicPartition tp = new TopicPartition("foo", 0);
+ ReplicaManager replicaManager = mock(ReplicaManager.class);
+ Function> partitionLogSupplier = partition -> toJava(replicaManager.getLog(partition));
+ Function> partitionLogEndOffsetSupplier = partition -> Optional.ofNullable((Long) replicaManager.getLogEndOffset(partition).get());
+ StringKeyValueDeserializer serde = mock(StringKeyValueDeserializer.class);
+ UnifiedLog log = mock(UnifiedLog.class);
+ CoordinatorPlayback> coordinator = mock(CoordinatorPlayback.class);
+
+ try (CoordinatorLoaderImpl> loader = new CoordinatorLoaderImpl<>(
+ Time.SYSTEM,
+ partitionLogSupplier,
+ partitionLogEndOffsetSupplier,
+ serde,
+ 1000
+ )) {
+ when(replicaManager.getLog(tp)).thenReturn(toScala(Optional.of(log)));
+ when(log.logStartOffset()).thenReturn(0L);
+ when(replicaManager.getLogEndOffset(tp)).thenReturn(toScala(Optional.of(2L)));
+
+ FetchDataInfo readResult = logReadResult(0, Arrays.asList(
+ new SimpleRecord("k1".getBytes(), "v1".getBytes()),
+ new SimpleRecord("k2".getBytes(), "v2".getBytes())
+ ));
+
+ when(log.read(0L, 1000, FetchIsolation.LOG_END, true))
+ .thenReturn(readResult);
+
+ when(serde.deserialize(any(ByteBuffer.class), any(ByteBuffer.class)))
+ .thenThrow(new RuntimeException("Error!"));
+
+ RuntimeException ex = assertFutureThrows(RuntimeException.class, loader.load(tp, coordinator));
+
+ assertNotNull(ex);
+ assertEquals(String.format("Deserializing record DefaultRecord(offset=0, timestamp=-1, key=2 bytes, value=2 bytes) from %s failed due to: Error!", tp), ex.getMessage());
+ }
+ }
+
+ @Test
+ void testLoadGroupAndOffsetsWithCorruptedLog() throws Exception {
+ // Simulate a case where startOffset < endOffset but log is empty. This could theoretically happen
+ // when all the records are expired and the active segment is truncated or when the partition
+ // is accidentally corrupted.
+ TopicPartition tp = new TopicPartition("foo", 0);
+ ReplicaManager replicaManager = mock(ReplicaManager.class);
+ Function> partitionLogSupplier = partition -> toJava(replicaManager.getLog(partition));
+ Function> partitionLogEndOffsetSupplier = partition -> Optional.ofNullable((Long) replicaManager.getLogEndOffset(partition).get());
+ StringKeyValueDeserializer serde = mock(StringKeyValueDeserializer.class);
+ UnifiedLog log = mock(UnifiedLog.class);
+ CoordinatorPlayback> coordinator = mock(CoordinatorPlayback.class);
+
+ try (CoordinatorLoaderImpl> loader = new CoordinatorLoaderImpl<>(
+ Time.SYSTEM,
+ partitionLogSupplier,
+ partitionLogEndOffsetSupplier,
+ serde,
+ 1000
+ )) {
+ when(replicaManager.getLog(tp)).thenReturn(toScala(Optional.of(log)));
+ when(log.logStartOffset()).thenReturn(0L);
+ when(replicaManager.getLogEndOffset(tp)).thenReturn(toScala(Optional.of(10L)));
+
+ FetchDataInfo readResult = logReadResult(0, List.of());
+
+ when(log.read(0L, 1000, FetchIsolation.LOG_END, true))
+ .thenReturn(readResult);
+
+ assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS));
+ }
+ }
+
+ @Test
+ void testLoadSummary() throws Exception {
+ TopicPartition tp = new TopicPartition("foo", 0);
+ ReplicaManager replicaManager = mock(ReplicaManager.class);
+ Function> partitionLogSupplier = partition -> toJava(replicaManager.getLog(partition));
+ Function> partitionLogEndOffsetSupplier = partition -> Optional.ofNullable((Long) replicaManager.getLogEndOffset(partition).get());
+ StringKeyValueDeserializer serde = new StringKeyValueDeserializer();
+ UnifiedLog log = mock(UnifiedLog.class);
+ CoordinatorPlayback> coordinator = mock(CoordinatorPlayback.class);
+ MockTime time = new MockTime();
+
+ try (CoordinatorLoaderImpl> loader = new CoordinatorLoaderImpl<>(
+ time,
+ partitionLogSupplier,
+ partitionLogEndOffsetSupplier,
+ serde,
+ 1000
+ )) {
+ long startTimeMs = time.milliseconds();
+ when(replicaManager.getLog(tp)).thenReturn(toScala(Optional.of(log)));
+ when(log.logStartOffset()).thenReturn(0L);
+ when(replicaManager.getLogEndOffset(tp)).thenReturn(toScala(Optional.of(5L)));
+
+ FetchDataInfo readResult1 = logReadResult(0, Arrays.asList(
+ new SimpleRecord("k1".getBytes(), "v1".getBytes()),
+ new SimpleRecord("k2".getBytes(), "v2".getBytes())
+ ));
+
+ when(log.read(0L, 1000, FetchIsolation.LOG_END, true))
+ .thenAnswer((InvocationOnMock invocation) -> {
+ time.sleep(1000);
+ return readResult1;
+ });
+
+ FetchDataInfo readResult2 = logReadResult(2, Arrays.asList(
+ new SimpleRecord("k3".getBytes(), "v3".getBytes()),
+ new SimpleRecord("k4".getBytes(), "v4".getBytes()),
+ new SimpleRecord("k5".getBytes(), "v5".getBytes())
+ ));
+
+ when(log.read(2L, 1000, FetchIsolation.LOG_END, true))
+ .thenReturn(readResult2);
+
+ CoordinatorLoader.LoadSummary summary = loader.load(tp, coordinator).get(10, TimeUnit.SECONDS);
+ assertEquals(startTimeMs, summary.startTimeMs());
+ assertEquals(startTimeMs + 1000, summary.endTimeMs());
+ assertEquals(5, summary.numRecords());
+ assertEquals(readResult1.records.sizeInBytes() + readResult2.records.sizeInBytes(), summary.numBytes());
+ }
+ }
+
+ @Test
+ void testUpdateLastWrittenOffsetOnBatchLoaded() throws Exception {
+ TopicPartition tp = new TopicPartition("foo", 0);
+ ReplicaManager replicaManager = mock(ReplicaManager.class);
+ Function> partitionLogSupplier = partition -> toJava(replicaManager.getLog(partition));
+ Function> partitionLogEndOffsetSupplier = partition -> Optional.ofNullable((Long) replicaManager.getLogEndOffset(partition).get());
+ StringKeyValueDeserializer serde = new StringKeyValueDeserializer();
+ UnifiedLog log = mock(UnifiedLog.class);
+ CoordinatorPlayback> coordinator = mock(CoordinatorPlayback.class);
+
+ try (CoordinatorLoaderImpl> loader = new CoordinatorLoaderImpl<>(
+ Time.SYSTEM,
+ partitionLogSupplier,
+ partitionLogEndOffsetSupplier,
+ serde,
+ 1000
+ )) {
+ when(replicaManager.getLog(tp)).thenReturn(toScala(Optional.of(log)));
+ when(log.logStartOffset()).thenReturn(0L);
+ when(log.highWatermark()).thenReturn(0L, 0L, 2L);
+ when(replicaManager.getLogEndOffset(tp)).thenReturn(toScala(Optional.of(7L)));
+
+ FetchDataInfo readResult1 = logReadResult(0, Arrays.asList(
+ new SimpleRecord("k1".getBytes(), "v1".getBytes()),
+ new SimpleRecord("k2".getBytes(), "v2".getBytes())
+ ));
+
+ when(log.read(0L, 1000, FetchIsolation.LOG_END, true))
+ .thenReturn(readResult1);
+
+ FetchDataInfo readResult2 = logReadResult(2, Arrays.asList(
+ new SimpleRecord("k3".getBytes(), "v3".getBytes()),
+ new SimpleRecord("k4".getBytes(), "v4".getBytes()),
+ new SimpleRecord("k5".getBytes(), "v5".getBytes())
+ ));
+
+ when(log.read(2L, 1000, FetchIsolation.LOG_END, true))
+ .thenReturn(readResult2);
+
+ FetchDataInfo readResult3 = logReadResult(5, Arrays.asList(
+ new SimpleRecord("k6".getBytes(), "v6".getBytes()),
+ new SimpleRecord("k7".getBytes(), "v7".getBytes())
+ ));
+
+ when(log.read(5L, 1000, FetchIsolation.LOG_END, true))
+ .thenReturn(readResult3);
+
+ assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS));
+
+ verify(coordinator).replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k1", "v1"));
+ verify(coordinator).replay(1L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k2", "v2"));
+ verify(coordinator).replay(2L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k3", "v3"));
+ verify(coordinator).replay(3L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k4", "v4"));
+ verify(coordinator).replay(4L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k5", "v5"));
+ verify(coordinator).replay(5L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k6", "v6"));
+ verify(coordinator).replay(6L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k7", "v7"));
+ verify(coordinator, times(0)).updateLastWrittenOffset(0L);
+ verify(coordinator, times(1)).updateLastWrittenOffset(2L);
+ verify(coordinator, times(1)).updateLastWrittenOffset(5L);
+ verify(coordinator, times(1)).updateLastWrittenOffset(7L);
+ verify(coordinator, times(1)).updateLastCommittedOffset(0L);
+ verify(coordinator, times(1)).updateLastCommittedOffset(2L);
+ verify(coordinator, times(0)).updateLastCommittedOffset(5L);
+ }
+ }
+
+ @Test
+ void testUpdateLastWrittenOffsetAndUpdateLastCommittedOffsetNoRecordsRead() throws Exception {
+ TopicPartition tp = new TopicPartition("foo", 0);
+ ReplicaManager replicaManager = mock(ReplicaManager.class);
+ Function> partitionLogSupplier = partition -> toJava(replicaManager.getLog(partition));
+ Function> partitionLogEndOffsetSupplier = partition -> Optional.ofNullable((Long) replicaManager.getLogEndOffset(partition).get());
+ StringKeyValueDeserializer serde = new StringKeyValueDeserializer();
+ UnifiedLog log = mock(UnifiedLog.class);
+ CoordinatorPlayback> coordinator = mock(CoordinatorPlayback.class);
+
+ try (CoordinatorLoaderImpl> loader = new CoordinatorLoaderImpl<>(
+ Time.SYSTEM,
+ partitionLogSupplier,
+ partitionLogEndOffsetSupplier,
+ serde,
+ 1000
+ )) {
+ when(replicaManager.getLog(tp)).thenReturn(toScala(Optional.of(log)));
+ when(log.logStartOffset()).thenReturn(0L);
+ when(log.highWatermark()).thenReturn(0L);
+ when(replicaManager.getLogEndOffset(tp)).thenReturn(toScala(Optional.of(0L)));
+
+ assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS));
+
+ verify(coordinator, times(0)).updateLastWrittenOffset(anyLong());
+ verify(coordinator, times(0)).updateLastCommittedOffset(anyLong());
+ }
+ }
+
+ @Test
+ void testUpdateLastWrittenOffsetOnBatchLoadedWhileHighWatermarkAhead() throws Exception {
+ TopicPartition tp = new TopicPartition("foo", 0);
+ ReplicaManager replicaManager = mock(ReplicaManager.class);
+ Function> partitionLogSupplier = partition -> toJava(replicaManager.getLog(partition));
+ Function> partitionLogEndOffsetSupplier = partition -> Optional.ofNullable((Long) replicaManager.getLogEndOffset(partition).get());
+ StringKeyValueDeserializer serde = new StringKeyValueDeserializer();
+ UnifiedLog log = mock(UnifiedLog.class);
+ CoordinatorPlayback> coordinator = mock(CoordinatorPlayback.class);
+
+ try (CoordinatorLoaderImpl> loader = new CoordinatorLoaderImpl<>(
+ Time.SYSTEM,
+ partitionLogSupplier,
+ partitionLogEndOffsetSupplier,
+ serde,
+ 1000
+ )) {
+ when(replicaManager.getLog(tp)).thenReturn(toScala(Optional.of(log)));
+ when(log.logStartOffset()).thenReturn(0L);
+ when(log.highWatermark()).thenReturn(5L, 7L, 7L);
+ when(replicaManager.getLogEndOffset(tp)).thenReturn(toScala(Optional.of(7L)));
+
+ FetchDataInfo readResult1 = logReadResult(0, Arrays.asList(
+ new SimpleRecord("k1".getBytes(), "v1".getBytes()),
+ new SimpleRecord("k2".getBytes(), "v2".getBytes())
+ ));
+
+ when(log.read(0L, 1000, FetchIsolation.LOG_END, true))
+ .thenReturn(readResult1);
+
+ FetchDataInfo readResult2 = logReadResult(2, Arrays.asList(
+ new SimpleRecord("k3".getBytes(), "v3".getBytes()),
+ new SimpleRecord("k4".getBytes(), "v4".getBytes()),
+ new SimpleRecord("k5".getBytes(), "v5".getBytes())
+ ));
+
+ when(log.read(2L, 1000, FetchIsolation.LOG_END, true))
+ .thenReturn(readResult2);
+
+ FetchDataInfo readResult3 = logReadResult(5, Arrays.asList(
+ new SimpleRecord("k6".getBytes(), "v6".getBytes()),
+ new SimpleRecord("k7".getBytes(), "v7".getBytes())
+ ));
+
+ when(log.read(5L, 1000, FetchIsolation.LOG_END, true))
+ .thenReturn(readResult3);
+
+ assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS));
+
+ verify(coordinator).replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k1", "v1"));
+ verify(coordinator).replay(1L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k2", "v2"));
+ verify(coordinator).replay(2L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k3", "v3"));
+ verify(coordinator).replay(3L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k4", "v4"));
+ verify(coordinator).replay(4L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k5", "v5"));
+ verify(coordinator).replay(5L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k6", "v6"));
+ verify(coordinator).replay(6L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k7", "v7"));
+ verify(coordinator, times(0)).updateLastWrittenOffset(0L);
+ verify(coordinator, times(0)).updateLastWrittenOffset(2L);
+ verify(coordinator, times(0)).updateLastWrittenOffset(5L);
+ verify(coordinator, times(1)).updateLastWrittenOffset(7L);
+ verify(coordinator, times(0)).updateLastCommittedOffset(0L);
+ verify(coordinator, times(0)).updateLastCommittedOffset(2L);
+ verify(coordinator, times(0)).updateLastCommittedOffset(5L);
+ verify(coordinator, times(1)).updateLastCommittedOffset(7L);
+ }
+ }
+
+ @Test
+ void testPartitionGoesOfflineDuringLoad() throws Exception {
+ TopicPartition tp = new TopicPartition("foo", 0);
+ ReplicaManager replicaManager = mock(ReplicaManager.class);
+ Function> partitionLogSupplier = partition -> toJava(replicaManager.getLog(partition));
+ Function> partitionLogEndOffsetSupplier = partition -> Optional.ofNullable((Long) replicaManager.getLogEndOffset(partition).get());
+ StringKeyValueDeserializer serde = new StringKeyValueDeserializer();
+ UnifiedLog log = mock(UnifiedLog.class);
+ CoordinatorPlayback> coordinator = mock(CoordinatorPlayback.class);
+
+ try (CoordinatorLoaderImpl> loader = new CoordinatorLoaderImpl<>(
+ Time.SYSTEM,
+ partitionLogSupplier,
+ partitionLogEndOffsetSupplier,
+ serde,
+ 1000
+ )) {
+ when(replicaManager.getLog(tp)).thenReturn(toScala(Optional.of(log)));
+ when(log.logStartOffset()).thenReturn(0L);
+ when(log.highWatermark()).thenReturn(0L);
+ when(replicaManager.getLogEndOffset(tp)).thenReturn(toScala(Optional.of(5L))).thenReturn(toScala(Optional.of(-1L)));
+
+ FetchDataInfo readResult1 = logReadResult(0, Arrays.asList(
+ new SimpleRecord("k1".getBytes(), "v1".getBytes()),
+ new SimpleRecord("k2".getBytes(), "v2".getBytes())
+ ));
+
+ when(log.read(0L, 1000, FetchIsolation.LOG_END, true))
+ .thenReturn(readResult1);
+
+ FetchDataInfo readResult2 = logReadResult(2, Arrays.asList(
+ new SimpleRecord("k3".getBytes(), "v3".getBytes()),
+ new SimpleRecord("k4".getBytes(), "v4".getBytes()),
+ new SimpleRecord("k5".getBytes(), "v5".getBytes())
+ ));
+
+ when(log.read(2L, 1000, FetchIsolation.LOG_END, true))
+ .thenReturn(readResult2);
+
+ assertFutureThrows(NotLeaderOrFollowerException.class, loader.load(tp, coordinator));
+ }
+ }
+
+ private FetchDataInfo logReadResult(long startOffset, List records) throws IOException {
+ return logReadResult(startOffset, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, records);
+ }
+
+ private FetchDataInfo logReadResult(
+ long startOffset,
+ long producerId,
+ short producerEpoch,
+ List records
+ ) throws IOException {
+ FileRecords fileRecords = mock(FileRecords.class);
+ MemoryRecords memoryRecords;
+ if (producerId == RecordBatch.NO_PRODUCER_ID) {
+ memoryRecords = MemoryRecords.withRecords(
+ startOffset,
+ Compression.NONE,
+ records.toArray(new SimpleRecord[0])
+ );
+ } else {
+ memoryRecords = MemoryRecords.withTransactionalRecords(
+ startOffset,
+ Compression.NONE,
+ producerId,
+ producerEpoch,
+ 0,
+ RecordBatch.NO_PARTITION_LEADER_EPOCH,
+ records.toArray(new SimpleRecord[0])
+ );
+ }
+
+ when(fileRecords.sizeInBytes()).thenReturn(memoryRecords.sizeInBytes());
+
+ ArgumentCaptor bufferCapture = ArgumentCaptor.forClass(ByteBuffer.class);
+ doAnswer(invocation -> {
+ ByteBuffer buffer = bufferCapture.getValue();
+ buffer.put(memoryRecords.buffer().duplicate());
+ buffer.flip();
+ return null;
+ }).when(fileRecords).readInto(bufferCapture.capture(), ArgumentMatchers.anyInt());
+
+ return new FetchDataInfo(new LogOffsetMetadata(startOffset), fileRecords);
+ }
+
+ private FetchDataInfo logReadResult(
+ long startOffset,
+ long producerId,
+ short producerEpoch,
+ ControlRecordType controlRecordType
+ ) throws IOException {
+ FileRecords fileRecords = mock(FileRecords.class);
+ MemoryRecords memoryRecords = MemoryRecords.withEndTransactionMarker(
+ startOffset,
+ 0L,
+ RecordBatch.NO_PARTITION_LEADER_EPOCH,
+ producerId,
+ producerEpoch,
+ new EndTransactionMarker(controlRecordType, 0)
+ );
+
+ when(fileRecords.sizeInBytes()).thenReturn(memoryRecords.sizeInBytes());
+
+ ArgumentCaptor bufferCapture = ArgumentCaptor.forClass(ByteBuffer.class);
+ doAnswer(invocation -> {
+ ByteBuffer buffer = bufferCapture.getValue();
+ buffer.put(memoryRecords.buffer().duplicate());
+ buffer.flip();
+ return null;
+ }).when(fileRecords).readInto(bufferCapture.capture(), ArgumentMatchers.anyInt());
+
+ return new FetchDataInfo(new LogOffsetMetadata(startOffset), fileRecords);
+ }
+
+}
diff --git a/core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala b/core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala
deleted file mode 100644
index 44a9b9565841d..0000000000000
--- a/core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala
+++ /dev/null
@@ -1,248 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.coordinator.group
-
-import kafka.server.ReplicaManager
-import kafka.utils.Logging
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.errors.NotLeaderOrFollowerException
-import org.apache.kafka.common.record.{ControlRecordType, FileRecords, MemoryRecords}
-import org.apache.kafka.common.requests.TransactionResult
-import org.apache.kafka.common.utils.Time
-import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader.LoadSummary
-import org.apache.kafka.coordinator.common.runtime.Deserializer.UnknownRecordTypeException
-import org.apache.kafka.coordinator.common.runtime.{CoordinatorLoader, CoordinatorPlayback, Deserializer}
-import org.apache.kafka.server.storage.log.FetchIsolation
-import org.apache.kafka.server.util.KafkaScheduler
-
-import java.nio.ByteBuffer
-import java.util.concurrent.CompletableFuture
-import java.util.concurrent.atomic.AtomicBoolean
-import scala.jdk.CollectionConverters._
-
-/**
- * Coordinator loader which reads records from a partition and replays them
- * to a group coordinator.
- *
- * @param replicaManager The replica manager.
- * @param deserializer The deserializer to use.
- * @param loadBufferSize The load buffer size.
- * @tparam T The record type.
- */
-class CoordinatorLoaderImpl[T](
- time: Time,
- replicaManager: ReplicaManager,
- deserializer: Deserializer[T],
- loadBufferSize: Int
-) extends CoordinatorLoader[T] with Logging {
- private val isRunning = new AtomicBoolean(true)
- private val scheduler = new KafkaScheduler(1)
- scheduler.startup()
-
- /**
- * Loads the coordinator by reading all the records from the TopicPartition
- * and applying them to the Replayable object.
- *
- * @param tp The TopicPartition to read from.
- * @param coordinator The object to apply records to.
- */
- override def load(
- tp: TopicPartition,
- coordinator: CoordinatorPlayback[T]
-): CompletableFuture[LoadSummary] = {
- val future = new CompletableFuture[LoadSummary]()
- val startTimeMs = time.milliseconds()
- val result = scheduler.scheduleOnce(s"Load coordinator from $tp",
- () => doLoad(tp, coordinator, future, startTimeMs))
- if (result.isCancelled) {
- future.completeExceptionally(new RuntimeException("Coordinator loader is closed."))
- }
- future
- }
-
- private def doLoad(
- tp: TopicPartition,
- coordinator: CoordinatorPlayback[T],
- future: CompletableFuture[LoadSummary],
- startTimeMs: Long
- ): Unit = {
- val schedulerQueueTimeMs = time.milliseconds() - startTimeMs
- try {
- replicaManager.getLog(tp) match {
- case None =>
- future.completeExceptionally(new NotLeaderOrFollowerException(
- s"Could not load records from $tp because the log does not exist."))
-
- case Some(log) =>
- def logEndOffset: Long = replicaManager.getLogEndOffset(tp).getOrElse(-1L)
-
- // Buffer may not be needed if records are read from memory.
- var buffer = ByteBuffer.allocate(0)
- // Loop breaks if leader changes at any time during the load, since logEndOffset is -1.
- var currentOffset = log.logStartOffset
- // Loop breaks if no records have been read, since the end of the log has been reached.
- // This is to ensure that the loop breaks even if the current offset remains smaller than
- // the log end offset but the log is empty. This could happen with compacted topics.
- var readAtLeastOneRecord = true
-
- var previousHighWatermark = -1L
- var numRecords = 0L
- var numBytes = 0L
- while (currentOffset < logEndOffset && readAtLeastOneRecord && isRunning.get) {
- val fetchDataInfo = log.read(currentOffset, loadBufferSize, FetchIsolation.LOG_END, true)
-
- readAtLeastOneRecord = fetchDataInfo.records.sizeInBytes > 0
-
- val memoryRecords = (fetchDataInfo.records: @unchecked) match {
- case records: MemoryRecords =>
- records
-
- case fileRecords: FileRecords =>
- val sizeInBytes = fileRecords.sizeInBytes
- val bytesNeeded = Math.max(loadBufferSize, sizeInBytes)
-
- // "minOneMessage = true in the above log.read() means that the buffer may need to
- // be grown to ensure progress can be made.
- if (buffer.capacity < bytesNeeded) {
- if (loadBufferSize < bytesNeeded)
- warn(s"Loaded metadata from $tp with buffer larger ($bytesNeeded bytes) than " +
- s"configured buffer size ($loadBufferSize bytes).")
-
- buffer = ByteBuffer.allocate(bytesNeeded)
- } else {
- buffer.clear()
- }
-
- fileRecords.readInto(buffer, 0)
- MemoryRecords.readableRecords(buffer)
- }
-
- memoryRecords.batches.forEach { batch =>
- if (batch.isControlBatch) {
- batch.asScala.foreach { record =>
- val controlRecord = ControlRecordType.parse(record.key)
- if (controlRecord == ControlRecordType.COMMIT) {
- if (isTraceEnabled) {
- trace(s"Replaying end transaction marker from $tp at offset ${record.offset} to commit transaction " +
- s"with producer id ${batch.producerId} and producer epoch ${batch.producerEpoch}.")
- }
- coordinator.replayEndTransactionMarker(
- batch.producerId,
- batch.producerEpoch,
- TransactionResult.COMMIT
- )
- } else if (controlRecord == ControlRecordType.ABORT) {
- if (isTraceEnabled) {
- trace(s"Replaying end transaction marker from $tp at offset ${record.offset} to abort transaction " +
- s"with producer id ${batch.producerId} and producer epoch ${batch.producerEpoch}.")
- }
- coordinator.replayEndTransactionMarker(
- batch.producerId,
- batch.producerEpoch,
- TransactionResult.ABORT
- )
- }
- }
- } else {
- batch.asScala.foreach { record =>
- numRecords = numRecords + 1
-
- val coordinatorRecordOpt = {
- try {
- Some(deserializer.deserialize(record.key, record.value))
- } catch {
- case ex: UnknownRecordTypeException =>
- warn(s"Unknown record type ${ex.unknownType} while loading offsets and group metadata " +
- s"from $tp. Ignoring it. It could be a left over from an aborted upgrade.")
- None
- case ex: RuntimeException =>
- val msg = s"Deserializing record $record from $tp failed due to: ${ex.getMessage}"
- error(s"$msg.")
- throw new RuntimeException(msg, ex)
- }
- }
-
- coordinatorRecordOpt.foreach { coordinatorRecord =>
- try {
- if (isTraceEnabled) {
- trace(s"Replaying record $coordinatorRecord from $tp at offset ${record.offset()} " +
- s"with producer id ${batch.producerId} and producer epoch ${batch.producerEpoch}.")
- }
- coordinator.replay(
- record.offset(),
- batch.producerId,
- batch.producerEpoch,
- coordinatorRecord
- )
- } catch {
- case ex: RuntimeException =>
- val msg = s"Replaying record $coordinatorRecord from $tp at offset ${record.offset()} " +
- s"with producer id ${batch.producerId} and producer epoch ${batch.producerEpoch} " +
- s"failed due to: ${ex.getMessage}"
- error(s"$msg.")
- throw new RuntimeException(msg, ex)
- }
- }
- }
- }
-
- // Note that the high watermark can be greater than the current offset but as we load more records
- // the current offset will eventually surpass the high watermark. Also note that the high watermark
- // will continue to advance while loading.
- currentOffset = batch.nextOffset
- val currentHighWatermark = log.highWatermark
- if (currentOffset >= currentHighWatermark) {
- coordinator.updateLastWrittenOffset(currentOffset)
-
- if (currentHighWatermark > previousHighWatermark) {
- coordinator.updateLastCommittedOffset(currentHighWatermark)
- previousHighWatermark = currentHighWatermark
- }
- }
- }
- numBytes = numBytes + memoryRecords.sizeInBytes()
- }
-
- val endTimeMs = time.milliseconds()
-
- if (logEndOffset == -1L) {
- future.completeExceptionally(new NotLeaderOrFollowerException(
- s"Stopped loading records from $tp because the partition is not online or is no longer the leader."
- ))
- } else if (isRunning.get) {
- future.complete(new LoadSummary(startTimeMs, endTimeMs, schedulerQueueTimeMs, numRecords, numBytes))
- } else {
- future.completeExceptionally(new RuntimeException("Coordinator loader is closed."))
- }
- }
- } catch {
- case ex: Throwable =>
- future.completeExceptionally(ex)
- }
- }
-
- /**
- * Closes the loader.
- */
- override def close(): Unit = {
- if (!isRunning.compareAndSet(true, false)) {
- warn("Coordinator loader is already shutting down.")
- return
- }
- scheduler.shutdown()
- }
-}
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala
index c3e666a736f4f..9af3492286e01 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -17,7 +17,7 @@
package kafka.server
-import kafka.coordinator.group.{CoordinatorLoaderImpl, CoordinatorPartitionWriter}
+import kafka.coordinator.group.CoordinatorPartitionWriter
import kafka.coordinator.transaction.TransactionCoordinator
import kafka.log.LogManager
import kafka.network.SocketServer
@@ -34,7 +34,7 @@ import org.apache.kafka.common.security.scram.internals.ScramMechanism
import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
import org.apache.kafka.common.utils.{LogContext, Time, Utils}
import org.apache.kafka.common.{ClusterResource, TopicPartition, Uuid}
-import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord
+import org.apache.kafka.coordinator.common.runtime.{CoordinatorLoaderImpl, CoordinatorRecord}
import org.apache.kafka.coordinator.group.metrics.{GroupCoordinatorMetrics, GroupCoordinatorRuntimeMetrics}
import org.apache.kafka.coordinator.group.{GroupConfigManager, GroupCoordinator, GroupCoordinatorRecordSerde, GroupCoordinatorService}
import org.apache.kafka.coordinator.share.metrics.{ShareCoordinatorMetrics, ShareCoordinatorRuntimeMetrics}
@@ -610,7 +610,8 @@ class BrokerServer(
)
val loader = new CoordinatorLoaderImpl[CoordinatorRecord](
time,
- replicaManager,
+ tp => replicaManager.getLog(tp).toJava,
+ tp => replicaManager.getLogEndOffset(tp).map(Long.box).toJava,
serde,
config.groupCoordinatorConfig.offsetsLoadBufferSize
)
@@ -640,7 +641,8 @@ class BrokerServer(
val serde = new ShareCoordinatorRecordSerde
val loader = new CoordinatorLoaderImpl[CoordinatorRecord](
time,
- replicaManager,
+ tp => replicaManager.getLog(tp).toJava,
+ tp => replicaManager.getLogEndOffset(tp).map(Long.box).toJava,
serde,
config.shareCoordinatorConfig.shareCoordinatorLoadBufferSize()
)
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala
deleted file mode 100644
index a0ca3f23c48fd..0000000000000
--- a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala
+++ /dev/null
@@ -1,632 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.coordinator.group
-
-import kafka.server.ReplicaManager
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.compress.Compression
-import org.apache.kafka.common.errors.NotLeaderOrFollowerException
-import org.apache.kafka.common.record.{ControlRecordType, EndTransactionMarker, FileRecords, MemoryRecords, RecordBatch, SimpleRecord}
-import org.apache.kafka.common.requests.TransactionResult
-import org.apache.kafka.common.utils.{MockTime, Time}
-import org.apache.kafka.coordinator.common.runtime.Deserializer.UnknownRecordTypeException
-import org.apache.kafka.coordinator.common.runtime.{CoordinatorPlayback, Deserializer}
-import org.apache.kafka.server.storage.log.FetchIsolation
-import org.apache.kafka.storage.internals.log.{FetchDataInfo, LogOffsetMetadata, UnifiedLog}
-import org.apache.kafka.test.TestUtils.assertFutureThrows
-import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull}
-import org.junit.jupiter.api.{Test, Timeout}
-import org.mockito.ArgumentMatchers.anyLong
-import org.mockito.{ArgumentCaptor, ArgumentMatchers}
-import org.mockito.Mockito.{mock, times, verify, when}
-import org.mockito.invocation.InvocationOnMock
-
-import java.nio.ByteBuffer
-import java.nio.charset.Charset
-import java.util.concurrent.{CountDownLatch, TimeUnit}
-import scala.util.Using
-
-class StringKeyValueDeserializer extends Deserializer[(String, String)] {
- override def deserialize(key: ByteBuffer, value: ByteBuffer): (String, String) = {
- (
- Charset.defaultCharset().decode(key).toString,
- Charset.defaultCharset().decode(value).toString
- )
- }
-}
-
-@Timeout(60)
-class CoordinatorLoaderImplTest {
- @Test
- def testNonexistentPartition(): Unit = {
- val tp = new TopicPartition("foo", 0)
- val replicaManager = mock(classOf[ReplicaManager])
- val serde = mock(classOf[Deserializer[(String, String)]])
- val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
-
- Using.resource(new CoordinatorLoaderImpl[(String, String)](
- time = Time.SYSTEM,
- replicaManager = replicaManager,
- deserializer = serde,
- loadBufferSize = 1000
- )) { loader =>
- when(replicaManager.getLog(tp)).thenReturn(None)
-
- val result = loader.load(tp, coordinator)
- assertFutureThrows(classOf[NotLeaderOrFollowerException], result)
- }
- }
-
- @Test
- def testLoadingIsRejectedWhenClosed(): Unit = {
- val tp = new TopicPartition("foo", 0)
- val replicaManager = mock(classOf[ReplicaManager])
- val serde = mock(classOf[Deserializer[(String, String)]])
- val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
-
- Using.resource(new CoordinatorLoaderImpl[(String, String)](
- time = Time.SYSTEM,
- replicaManager = replicaManager,
- deserializer = serde,
- loadBufferSize = 1000
- )) { loader =>
- loader.close()
-
- val result = loader.load(tp, coordinator)
- assertFutureThrows(classOf[RuntimeException], result)
- }
- }
-
- @Test
- def testLoading(): Unit = {
- val tp = new TopicPartition("foo", 0)
- val replicaManager = mock(classOf[ReplicaManager])
- val serde = new StringKeyValueDeserializer
- val log = mock(classOf[UnifiedLog])
- val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
-
- Using.resource(new CoordinatorLoaderImpl[(String, String)](
- time = Time.SYSTEM,
- replicaManager = replicaManager,
- deserializer = serde,
- loadBufferSize = 1000
- )) { loader =>
- when(replicaManager.getLog(tp)).thenReturn(Some(log))
- when(log.logStartOffset).thenReturn(0L)
- when(replicaManager.getLogEndOffset(tp)).thenReturn(Some(9L))
- when(log.highWatermark).thenReturn(0L)
-
- val readResult1 = logReadResult(startOffset = 0, records = Seq(
- new SimpleRecord("k1".getBytes, "v1".getBytes),
- new SimpleRecord("k2".getBytes, "v2".getBytes)
- ))
-
- when(log.read(0L, 1000, FetchIsolation.LOG_END, true
- )).thenReturn(readResult1)
-
- val readResult2 = logReadResult(startOffset = 2, records = Seq(
- new SimpleRecord("k3".getBytes, "v3".getBytes),
- new SimpleRecord("k4".getBytes, "v4".getBytes),
- new SimpleRecord("k5".getBytes, "v5".getBytes)
- ))
-
- when(log.read(2L, 1000, FetchIsolation.LOG_END, true
- )).thenReturn(readResult2)
-
- val readResult3 = logReadResult(startOffset = 5, producerId = 100L, producerEpoch = 5, records = Seq(
- new SimpleRecord("k6".getBytes, "v6".getBytes),
- new SimpleRecord("k7".getBytes, "v7".getBytes)
- ))
-
- when(log.read(5L, 1000, FetchIsolation.LOG_END, true
- )).thenReturn(readResult3)
-
- val readResult4 = logReadResult(
- startOffset = 7,
- producerId = 100L,
- producerEpoch = 5,
- controlRecordType = ControlRecordType.COMMIT
- )
-
- when(log.read(7L, 1000, FetchIsolation.LOG_END, true
- )).thenReturn(readResult4)
-
- val readResult5 = logReadResult(
- startOffset = 8,
- producerId = 500L,
- producerEpoch = 10,
- controlRecordType = ControlRecordType.ABORT
- )
-
- when(log.read(8L, 1000, FetchIsolation.LOG_END, true
- )).thenReturn(readResult5)
-
- assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS))
-
- verify(coordinator).replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k1", "v1"))
- verify(coordinator).replay(1L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k2", "v2"))
- verify(coordinator).replay(2L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k3", "v3"))
- verify(coordinator).replay(3L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k4", "v4"))
- verify(coordinator).replay(4L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k5", "v5"))
- verify(coordinator).replay(5L, 100L, 5.toShort, ("k6", "v6"))
- verify(coordinator).replay(6L, 100L, 5.toShort, ("k7", "v7"))
- verify(coordinator).replayEndTransactionMarker(100L, 5, TransactionResult.COMMIT)
- verify(coordinator).replayEndTransactionMarker(500L, 10, TransactionResult.ABORT)
- verify(coordinator).updateLastWrittenOffset(2)
- verify(coordinator).updateLastWrittenOffset(5)
- verify(coordinator).updateLastWrittenOffset(7)
- verify(coordinator).updateLastWrittenOffset(8)
- verify(coordinator).updateLastCommittedOffset(0)
- }
- }
-
- @Test
- def testLoadingStoppedWhenClosed(): Unit = {
- val tp = new TopicPartition("foo", 0)
- val replicaManager = mock(classOf[ReplicaManager])
- val serde = new StringKeyValueDeserializer
- val log = mock(classOf[UnifiedLog])
- val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
-
- Using.resource(new CoordinatorLoaderImpl[(String, String)](
- time = Time.SYSTEM,
- replicaManager = replicaManager,
- deserializer = serde,
- loadBufferSize = 1000
- )) { loader =>
- when(replicaManager.getLog(tp)).thenReturn(Some(log))
- when(log.logStartOffset).thenReturn(0L)
- when(replicaManager.getLogEndOffset(tp)).thenReturn(Some(100L))
-
- val readResult = logReadResult(startOffset = 0, records = Seq(
- new SimpleRecord("k1".getBytes, "v1".getBytes),
- new SimpleRecord("k2".getBytes, "v2".getBytes)
- ))
-
- val latch = new CountDownLatch(1)
- when(log.read(
- ArgumentMatchers.anyLong(),
- ArgumentMatchers.eq(1000),
- ArgumentMatchers.eq(FetchIsolation.LOG_END),
- ArgumentMatchers.eq(true)
- )).thenAnswer { _ =>
- latch.countDown()
- readResult
- }
-
- val result = loader.load(tp, coordinator)
- latch.await(10, TimeUnit.SECONDS)
- loader.close()
-
- val ex = assertFutureThrows(classOf[RuntimeException], result)
- assertEquals("Coordinator loader is closed.", ex.getMessage)
- }
- }
-
- @Test
- def testUnknownRecordTypeAreIgnored(): Unit = {
- val tp = new TopicPartition("foo", 0)
- val replicaManager = mock(classOf[ReplicaManager])
- val serde = mock(classOf[StringKeyValueDeserializer])
- val log = mock(classOf[UnifiedLog])
- val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
-
- Using.resource(new CoordinatorLoaderImpl[(String, String)](
- time = Time.SYSTEM,
- replicaManager = replicaManager,
- deserializer = serde,
- loadBufferSize = 1000
- )) { loader =>
- when(replicaManager.getLog(tp)).thenReturn(Some(log))
- when(log.logStartOffset).thenReturn(0L)
- when(replicaManager.getLogEndOffset(tp)).thenReturn(Some(2L))
-
- val readResult = logReadResult(startOffset = 0, records = Seq(
- new SimpleRecord("k1".getBytes, "v1".getBytes),
- new SimpleRecord("k2".getBytes, "v2".getBytes)
- ))
-
- when(log.read(0L, 1000, FetchIsolation.LOG_END, true
- )).thenReturn(readResult)
-
- when(serde.deserialize(ArgumentMatchers.any(), ArgumentMatchers.any()))
- .thenThrow(new UnknownRecordTypeException(1))
- .thenReturn(("k2", "v2"))
-
- loader.load(tp, coordinator).get(10, TimeUnit.SECONDS)
-
- verify(coordinator).replay(1L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k2", "v2"))
- }
- }
-
- @Test
- def testDeserializationErrorFailsTheLoading(): Unit = {
- val tp = new TopicPartition("foo", 0)
- val replicaManager = mock(classOf[ReplicaManager])
- val serde = mock(classOf[StringKeyValueDeserializer])
- val log = mock(classOf[UnifiedLog])
- val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
-
- Using.resource(new CoordinatorLoaderImpl[(String, String)](
- time = Time.SYSTEM,
- replicaManager = replicaManager,
- deserializer = serde,
- loadBufferSize = 1000
- )) { loader =>
- when(replicaManager.getLog(tp)).thenReturn(Some(log))
- when(log.logStartOffset).thenReturn(0L)
- when(replicaManager.getLogEndOffset(tp)).thenReturn(Some(2L))
-
- val readResult = logReadResult(startOffset = 0, records = Seq(
- new SimpleRecord("k1".getBytes, "v1".getBytes),
- new SimpleRecord("k2".getBytes, "v2".getBytes)
- ))
-
- when(log.read(0L, 1000, FetchIsolation.LOG_END, true
- )).thenReturn(readResult)
-
- when(serde.deserialize(ArgumentMatchers.any(), ArgumentMatchers.any()))
- .thenThrow(new RuntimeException("Error!"))
-
- val ex = assertFutureThrows(classOf[RuntimeException], loader.load(tp, coordinator))
-
- assertEquals(s"Deserializing record DefaultRecord(offset=0, timestamp=-1, key=2 bytes, value=2 bytes) from $tp failed due to: Error!", ex.getMessage)
- }
- }
-
- @Test
- def testLoadGroupAndOffsetsWithCorruptedLog(): Unit = {
- // Simulate a case where startOffset < endOffset but log is empty. This could theoretically happen
- // when all the records are expired and the active segment is truncated or when the partition
- // is accidentally corrupted.
- val tp = new TopicPartition("foo", 0)
- val replicaManager = mock(classOf[ReplicaManager])
- val serde = mock(classOf[StringKeyValueDeserializer])
- val log = mock(classOf[UnifiedLog])
- val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
-
- Using.resource(new CoordinatorLoaderImpl[(String, String)](
- time = Time.SYSTEM,
- replicaManager = replicaManager,
- deserializer = serde,
- loadBufferSize = 1000
- )) { loader =>
- when(replicaManager.getLog(tp)).thenReturn(Some(log))
- when(log.logStartOffset).thenReturn(0L)
- when(replicaManager.getLogEndOffset(tp)).thenReturn(Some(10L))
-
- val readResult = logReadResult(startOffset = 0, records = Seq())
-
- when(log.read(0L, 1000, FetchIsolation.LOG_END, true
- )).thenReturn(readResult)
-
- assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS))
- }
- }
-
- @Test
- def testLoadSummary(): Unit = {
- val tp = new TopicPartition("foo", 0)
- val replicaManager = mock(classOf[ReplicaManager])
- val serde = new StringKeyValueDeserializer
- val log = mock(classOf[UnifiedLog])
- val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
- val time = new MockTime()
-
- Using.resource(new CoordinatorLoaderImpl[(String, String)](
- time,
- replicaManager = replicaManager,
- deserializer = serde,
- loadBufferSize = 1000
- )) { loader =>
- val startTimeMs = time.milliseconds()
- when(replicaManager.getLog(tp)).thenReturn(Some(log))
- when(log.logStartOffset).thenReturn(0L)
- when(replicaManager.getLogEndOffset(tp)).thenReturn(Some(5L))
-
- val readResult1 = logReadResult(startOffset = 0, records = Seq(
- new SimpleRecord("k1".getBytes, "v1".getBytes),
- new SimpleRecord("k2".getBytes, "v2".getBytes)
- ))
-
- when(log.read(0L, 1000, FetchIsolation.LOG_END, true
- )).thenAnswer((_: InvocationOnMock) => {
- time.sleep(1000)
- readResult1
- })
-
- val readResult2 = logReadResult(startOffset = 2, records = Seq(
- new SimpleRecord("k3".getBytes, "v3".getBytes),
- new SimpleRecord("k4".getBytes, "v4".getBytes),
- new SimpleRecord("k5".getBytes, "v5".getBytes)
- ))
-
- when(log.read(2L, 1000, FetchIsolation.LOG_END, true
- )).thenReturn(readResult2)
-
- val summary = loader.load(tp, coordinator).get(10, TimeUnit.SECONDS)
- assertEquals(startTimeMs, summary.startTimeMs())
- assertEquals(startTimeMs + 1000, summary.endTimeMs())
- assertEquals(5, summary.numRecords())
- assertEquals(readResult1.records.sizeInBytes() + readResult2.records.sizeInBytes(), summary.numBytes())
- }
- }
-
- @Test
- def testUpdateLastWrittenOffsetOnBatchLoaded(): Unit = {
- val tp = new TopicPartition("foo", 0)
- val replicaManager = mock(classOf[ReplicaManager])
- val serde = new StringKeyValueDeserializer
- val log = mock(classOf[UnifiedLog])
- val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
-
- Using.resource(new CoordinatorLoaderImpl[(String, String)](
- time = Time.SYSTEM,
- replicaManager = replicaManager,
- deserializer = serde,
- loadBufferSize = 1000
- )) { loader =>
- when(replicaManager.getLog(tp)).thenReturn(Some(log))
- when(log.logStartOffset).thenReturn(0L)
- when(log.highWatermark).thenReturn(0L).thenReturn(0L).thenReturn(2L)
- when(replicaManager.getLogEndOffset(tp)).thenReturn(Some(7L))
-
- val readResult1 = logReadResult(startOffset = 0, records = Seq(
- new SimpleRecord("k1".getBytes, "v1".getBytes),
- new SimpleRecord("k2".getBytes, "v2".getBytes)
- ))
-
- when(log.read(0L, 1000, FetchIsolation.LOG_END, true
- )).thenReturn(readResult1)
-
- val readResult2 = logReadResult(startOffset = 2, records = Seq(
- new SimpleRecord("k3".getBytes, "v3".getBytes),
- new SimpleRecord("k4".getBytes, "v4".getBytes),
- new SimpleRecord("k5".getBytes, "v5".getBytes)
- ))
-
- when(log.read(2L, 1000, FetchIsolation.LOG_END, true
- )).thenReturn(readResult2)
-
- val readResult3 = logReadResult(startOffset = 5, records = Seq(
- new SimpleRecord("k6".getBytes, "v6".getBytes),
- new SimpleRecord("k7".getBytes, "v7".getBytes)
- ))
-
- when(log.read(5L, 1000, FetchIsolation.LOG_END, true
- )).thenReturn(readResult3)
-
- assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS))
-
- verify(coordinator).replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k1", "v1"))
- verify(coordinator).replay(1L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k2", "v2"))
- verify(coordinator).replay(2L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k3", "v3"))
- verify(coordinator).replay(3L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k4", "v4"))
- verify(coordinator).replay(4L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k5", "v5"))
- verify(coordinator).replay(5L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k6", "v6"))
- verify(coordinator).replay(6L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k7", "v7"))
- verify(coordinator, times(0)).updateLastWrittenOffset(0)
- verify(coordinator, times(1)).updateLastWrittenOffset(2)
- verify(coordinator, times(1)).updateLastWrittenOffset(5)
- verify(coordinator, times(1)).updateLastWrittenOffset(7)
- verify(coordinator, times(1)).updateLastCommittedOffset(0)
- verify(coordinator, times(1)).updateLastCommittedOffset(2)
- verify(coordinator, times(0)).updateLastCommittedOffset(5)
- }
- }
-
- @Test
- def testUpdateLastWrittenOffsetAndUpdateLastCommittedOffsetNoRecordsRead(): Unit = {
- val tp = new TopicPartition("foo", 0)
- val replicaManager = mock(classOf[ReplicaManager])
- val serde = new StringKeyValueDeserializer
- val log = mock(classOf[UnifiedLog])
- val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
-
- Using.resource(new CoordinatorLoaderImpl[(String, String)](
- time = Time.SYSTEM,
- replicaManager = replicaManager,
- deserializer = serde,
- loadBufferSize = 1000
- )) { loader =>
- when(replicaManager.getLog(tp)).thenReturn(Some(log))
- when(log.logStartOffset).thenReturn(0L)
- when(log.highWatermark).thenReturn(0L)
- when(replicaManager.getLogEndOffset(tp)).thenReturn(Some(0L))
-
- assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS))
-
- verify(coordinator, times(0)).updateLastWrittenOffset(anyLong())
- verify(coordinator, times(0)).updateLastCommittedOffset(anyLong())
- }
- }
-
- @Test
- def testUpdateLastWrittenOffsetOnBatchLoadedWhileHighWatermarkAhead(): Unit = {
- val tp = new TopicPartition("foo", 0)
- val replicaManager = mock(classOf[ReplicaManager])
- val serde = new StringKeyValueDeserializer
- val log = mock(classOf[UnifiedLog])
- val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
-
- Using.resource(new CoordinatorLoaderImpl[(String, String)](
- time = Time.SYSTEM,
- replicaManager = replicaManager,
- deserializer = serde,
- loadBufferSize = 1000
- )) { loader =>
- when(replicaManager.getLog(tp)).thenReturn(Some(log))
- when(log.logStartOffset).thenReturn(0L)
- when(log.highWatermark).thenReturn(5L).thenReturn(7L).thenReturn(7L)
- when(replicaManager.getLogEndOffset(tp)).thenReturn(Some(7L))
-
- val readResult1 = logReadResult(startOffset = 0, records = Seq(
- new SimpleRecord("k1".getBytes, "v1".getBytes),
- new SimpleRecord("k2".getBytes, "v2".getBytes)
- ))
-
- when(log.read(0L, 1000, FetchIsolation.LOG_END, true
- )).thenReturn(readResult1)
-
- val readResult2 = logReadResult(startOffset = 2, records = Seq(
- new SimpleRecord("k3".getBytes, "v3".getBytes),
- new SimpleRecord("k4".getBytes, "v4".getBytes),
- new SimpleRecord("k5".getBytes, "v5".getBytes)
- ))
-
- when(log.read(2L, 1000, FetchIsolation.LOG_END, true
- )).thenReturn(readResult2)
-
- val readResult3 = logReadResult(startOffset = 5, records = Seq(
- new SimpleRecord("k6".getBytes, "v6".getBytes),
- new SimpleRecord("k7".getBytes, "v7".getBytes)
- ))
-
- when(log.read(5L, 1000, FetchIsolation.LOG_END, true
- )).thenReturn(readResult3)
-
- assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS))
-
- verify(coordinator).replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k1", "v1"))
- verify(coordinator).replay(1L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k2", "v2"))
- verify(coordinator).replay(2L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k3", "v3"))
- verify(coordinator).replay(3L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k4", "v4"))
- verify(coordinator).replay(4L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k5", "v5"))
- verify(coordinator).replay(5L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k6", "v6"))
- verify(coordinator).replay(6L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k7", "v7"))
- verify(coordinator, times(0)).updateLastWrittenOffset(0)
- verify(coordinator, times(0)).updateLastWrittenOffset(2)
- verify(coordinator, times(0)).updateLastWrittenOffset(5)
- verify(coordinator, times(1)).updateLastWrittenOffset(7)
- verify(coordinator, times(0)).updateLastCommittedOffset(0)
- verify(coordinator, times(0)).updateLastCommittedOffset(2)
- verify(coordinator, times(0)).updateLastCommittedOffset(5)
- verify(coordinator, times(1)).updateLastCommittedOffset(7)
- }
- }
-
- @Test
- def testPartitionGoesOfflineDuringLoad(): Unit = {
- val tp = new TopicPartition("foo", 0)
- val replicaManager = mock(classOf[ReplicaManager])
- val serde = new StringKeyValueDeserializer
- val log = mock(classOf[UnifiedLog])
- val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
-
- Using.resource(new CoordinatorLoaderImpl[(String, String)](
- time = Time.SYSTEM,
- replicaManager = replicaManager,
- deserializer = serde,
- loadBufferSize = 1000
- )) { loader =>
- when(replicaManager.getLog(tp)).thenReturn(Some(log))
- when(log.logStartOffset).thenReturn(0L)
- when(log.highWatermark).thenReturn(0L)
- when(replicaManager.getLogEndOffset(tp)).thenReturn(Some(5L)).thenReturn(Some(-1L))
-
- val readResult1 = logReadResult(startOffset = 0, records = Seq(
- new SimpleRecord("k1".getBytes, "v1".getBytes),
- new SimpleRecord("k2".getBytes, "v2".getBytes)
- ))
-
- when(log.read(0L, 1000, FetchIsolation.LOG_END, true
- )).thenReturn(readResult1)
-
- val readResult2 = logReadResult(startOffset = 2, records = Seq(
- new SimpleRecord("k3".getBytes, "v3".getBytes),
- new SimpleRecord("k4".getBytes, "v4".getBytes),
- new SimpleRecord("k5".getBytes, "v5".getBytes)
- ))
-
- when(log.read(2L, 1000, FetchIsolation.LOG_END, true
- )).thenReturn(readResult2)
-
- assertFutureThrows(classOf[NotLeaderOrFollowerException], loader.load(tp, coordinator))
- }
- }
-
- private def logReadResult(
- startOffset: Long,
- producerId: Long = RecordBatch.NO_PRODUCER_ID,
- producerEpoch: Short = RecordBatch.NO_PRODUCER_EPOCH,
- records: Seq[SimpleRecord]
- ): FetchDataInfo = {
- val fileRecords = mock(classOf[FileRecords])
- val memoryRecords = if (producerId == RecordBatch.NO_PRODUCER_ID) {
- MemoryRecords.withRecords(
- startOffset,
- Compression.NONE,
- records: _*
- )
- } else {
- MemoryRecords.withTransactionalRecords(
- startOffset,
- Compression.NONE,
- producerId,
- producerEpoch,
- 0,
- RecordBatch.NO_PARTITION_LEADER_EPOCH,
- records: _*
- )
- }
-
- when(fileRecords.sizeInBytes).thenReturn(memoryRecords.sizeInBytes)
-
- val bufferCapture: ArgumentCaptor[ByteBuffer] = ArgumentCaptor.forClass(classOf[ByteBuffer])
- when(fileRecords.readInto(
- bufferCapture.capture(),
- ArgumentMatchers.anyInt())
- ).thenAnswer { _ =>
- val buffer = bufferCapture.getValue
- buffer.put(memoryRecords.buffer.duplicate)
- buffer.flip()
- }
-
- new FetchDataInfo(new LogOffsetMetadata(startOffset), fileRecords)
- }
-
- private def logReadResult(
- startOffset: Long,
- producerId: Long,
- producerEpoch: Short,
- controlRecordType: ControlRecordType
- ): FetchDataInfo = {
- val fileRecords = mock(classOf[FileRecords])
- val memoryRecords = MemoryRecords.withEndTransactionMarker(
- startOffset,
- 0L,
- RecordBatch.NO_PARTITION_LEADER_EPOCH,
- producerId,
- producerEpoch,
- new EndTransactionMarker(controlRecordType, 0)
- )
-
- when(fileRecords.sizeInBytes).thenReturn(memoryRecords.sizeInBytes)
-
- val bufferCapture: ArgumentCaptor[ByteBuffer] = ArgumentCaptor.forClass(classOf[ByteBuffer])
- when(fileRecords.readInto(
- bufferCapture.capture(),
- ArgumentMatchers.anyInt())
- ).thenAnswer { _ =>
- val buffer = bufferCapture.getValue
- buffer.put(memoryRecords.buffer.duplicate)
- buffer.flip()
- }
-
- new FetchDataInfo(new LogOffsetMetadata(startOffset), fileRecords)
- }
-}