Skip to content

Commit 04d34f3

Browse files
authored
KAFKA-17124: Fix flaky DumpLogSegmentsTest#testDumpRemoteLogMetadataNonZeroStartingOffset (#16580)
This changes should fix the flakiness reported for DumpLogSegmentsTest#testDumpRemoteLogMetadataNonZeroStartingOffset. I was not able to reproduce locally, but the issue was that the second segment was not created in time: Missing required argument "[files]" The fix consists of getting the log path directly from the rolled segment. We were also creating the log twice, and that was producing this warning: [2024-07-12 00:57:28,368] WARN [LocalLog partition=kafka-832386, dir=/tmp/kafka-2956913950351159820] Trying to roll a new log segment with start offset 0 =max(provided offset = Some(0), LEO = 0) while it already exists and is active with size 0. Size of time index: 873813, size of offset index: 1310720. (kafka.log.LocalLog:70) This is also fixed. Reviewers: Luke Chen <[email protected]>
1 parent 9a9eb18 commit 04d34f3

File tree

1 file changed

+39
-26
lines changed

1 file changed

+39
-26
lines changed

core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala

+39-26
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ import org.apache.kafka.common.compress.Compression
3636
import org.apache.kafka.common.config.TopicConfig
3737
import org.apache.kafka.common.metadata.{PartitionChangeRecord, RegisterBrokerRecord, TopicRecord}
3838
import org.apache.kafka.common.protocol.{ByteBufferAccessor, ObjectSerializationCache}
39-
import org.apache.kafka.common.record.{ControlRecordType, EndTransactionMarker, MemoryRecords, Record, RecordVersion, SimpleRecord}
39+
import org.apache.kafka.common.record.{ControlRecordType, EndTransactionMarker, MemoryRecords, Record, RecordBatch, RecordVersion, SimpleRecord}
4040
import org.apache.kafka.common.utils.Utils
4141
import org.apache.kafka.coordinator.group.{CoordinatorRecord, CoordinatorRecordSerde}
4242
import org.apache.kafka.coordinator.group.generated.{ConsumerGroupMemberMetadataValue, ConsumerGroupMetadataKey, ConsumerGroupMetadataValue, GroupMetadataKey, GroupMetadataValue}
@@ -52,7 +52,7 @@ import org.apache.kafka.server.util.MockTime
5252
import org.apache.kafka.snapshot.RecordsSnapshotWriter
5353
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchIsolation, LogConfig, LogDirFailureChannel, ProducerStateManagerConfig}
5454
import org.junit.jupiter.api.Assertions._
55-
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
55+
import org.junit.jupiter.api.{AfterEach, Test}
5656

5757
import java.nio.file.attribute.PosixFilePermissions
5858
import java.nio.file.{AccessDeniedException, Files, NoSuchFileException, Paths}
@@ -73,12 +73,14 @@ class DumpLogSegmentsTest {
7373
val indexFilePath = s"$logDir/$segmentName.index"
7474
val timeIndexFilePath = s"$logDir/$segmentName.timeindex"
7575
val time = new MockTime(0, 0)
76-
77-
val batches = new ArrayBuffer[BatchInfo]
7876
var log: UnifiedLog = _
7977

80-
@BeforeEach
81-
def setUp(): Unit = {
78+
@AfterEach
79+
def afterEach(): Unit = {
80+
Option(log).foreach(log => Utils.closeQuietly(log, "UnifiedLog"))
81+
}
82+
83+
private def createTestLog = {
8284
val props = new Properties
8385
props.setProperty(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, "128")
8486
log = UnifiedLog(
@@ -96,9 +98,10 @@ class DumpLogSegmentsTest {
9698
topicId = None,
9799
keepPartitionMetadataFile = true
98100
)
101+
log
99102
}
100103

101-
def addSimpleRecords(): Unit = {
104+
private def addSimpleRecords(log: UnifiedLog, batches: ArrayBuffer[BatchInfo]): Unit = {
102105
val now = System.currentTimeMillis()
103106
val firstBatchRecords = (0 until 10).map { i => new SimpleRecord(now + i * 2, s"message key $i".getBytes, s"message value $i".getBytes)}
104107
batches += BatchInfo(firstBatchRecords, hasKeys = true, hasValues = true)
@@ -117,14 +120,10 @@ class DumpLogSegmentsTest {
117120
log.flush(false)
118121
}
119122

120-
@AfterEach
121-
def tearDown(): Unit = {
122-
Utils.closeQuietly(log, "UnifiedLog")
123-
Utils.delete(tmpDir)
124-
}
125-
126123
@Test
127124
def testBatchAndRecordMetadataOutput(): Unit = {
125+
log = createTestLog
126+
128127
log.appendAsLeader(MemoryRecords.withRecords(Compression.NONE, 0,
129128
new SimpleRecord("a".getBytes),
130129
new SimpleRecord("b".getBytes)
@@ -154,12 +153,15 @@ class DumpLogSegmentsTest {
154153
new EndTransactionMarker(ControlRecordType.COMMIT, 100)
155154
), origin = AppendOrigin.COORDINATOR, leaderEpoch = 7)
156155

157-
assertDumpLogRecordMetadata()
156+
assertDumpLogRecordMetadata(log)
158157
}
159158

160159
@Test
161160
def testPrintDataLog(): Unit = {
162-
addSimpleRecords()
161+
log = createTestLog
162+
val batches = new ArrayBuffer[BatchInfo]
163+
addSimpleRecords(log, batches)
164+
163165
def verifyRecordsInOutput(checkKeysAndValues: Boolean, args: Array[String]): Unit = {
164166
def isBatch(index: Int): Boolean = {
165167
var i = 0
@@ -229,7 +231,10 @@ class DumpLogSegmentsTest {
229231

230232
@Test
231233
def testDumpIndexMismatches(): Unit = {
232-
addSimpleRecords()
234+
log = createTestLog
235+
val batches = new ArrayBuffer[BatchInfo]
236+
addSimpleRecords(log, batches)
237+
233238
val offsetMismatches = mutable.Map[String, List[(Long, Long)]]()
234239
DumpLogSegments.dumpIndex(new File(indexFilePath), indexSanityOnly = false, verifyOnly = true, offsetMismatches,
235240
Int.MaxValue)
@@ -238,7 +243,10 @@ class DumpLogSegmentsTest {
238243

239244
@Test
240245
def testDumpTimeIndexErrors(): Unit = {
241-
addSimpleRecords()
246+
log = createTestLog
247+
val batches = new ArrayBuffer[BatchInfo]
248+
addSimpleRecords(log, batches)
249+
242250
val errors = new TimeIndexDumpErrors
243251
DumpLogSegments.dumpTimeIndex(new File(timeIndexFilePath), indexSanityOnly = false, verifyOnly = true, errors)
244252
assertEquals(Map.empty, errors.misMatchesForTimeIndexFilesMap)
@@ -384,18 +392,18 @@ class DumpLogSegmentsTest {
384392
new SimpleRecord(null, new RemoteLogMetadataSerde().serialize(message))
385393
}).toArray
386394

387-
val memoryRecordsSizeInBytes = MemoryRecords.withRecords(Compression.NONE, metadataRecords:_*).sizeInBytes()
388-
val logConfig = LogTestUtils.createLogConfig(segmentBytes = memoryRecordsSizeInBytes)
395+
val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
389396
log = LogTestUtils.createLog(logDir, logConfig, new BrokerTopicStats, time.scheduler, time)
390397
log.appendAsLeader(MemoryRecords.withRecords(Compression.NONE, metadataRecords:_*), leaderEpoch = 0)
391-
log.appendAsLeader(MemoryRecords.withRecords(Compression.NONE, metadataRecords:_*), leaderEpoch = 0)
398+
val secondSegment = log.roll();
399+
secondSegment.append(1L, RecordBatch.NO_TIMESTAMP, 1L, MemoryRecords.withRecords(Compression.NONE, metadataRecords:_*))
400+
secondSegment.flush()
392401
log.flush(true)
393-
394-
val logPaths = logDir.listFiles.filter(_.getName.endsWith(".log")).map(_.getAbsolutePath)
402+
395403
val expectedDeletePayload = String.format("RemotePartitionDeleteMetadata{topicPartition=%s:%s-0, " +
396404
"state=DELETE_PARTITION_MARKED, eventTimestampMs=0, brokerId=0}", topicId, topicName)
397405

398-
val output = runDumpLogSegments(Array("--remote-log-metadata-decoder", "--files", logPaths(1)))
406+
val output = runDumpLogSegments(Array("--remote-log-metadata-decoder", "--files", secondSegment.log().file().getAbsolutePath))
399407
assertTrue(batchCount(output) == 1)
400408
assertTrue(recordCount(output) == 1)
401409
assertTrue(output.contains("Log starting offset: 1"))
@@ -584,6 +592,8 @@ class DumpLogSegmentsTest {
584592

585593
@Test
586594
def testDumpEmptyIndex(): Unit = {
595+
log = createTestLog
596+
587597
val indexFile = new File(indexFilePath)
588598
new PrintWriter(indexFile).close()
589599
val expectOutput = s"$indexFile is empty.\n"
@@ -605,7 +615,10 @@ class DumpLogSegmentsTest {
605615

606616
@Test
607617
def testPrintDataLogPartialBatches(): Unit = {
608-
addSimpleRecords()
618+
log = createTestLog
619+
val batches = new ArrayBuffer[BatchInfo]
620+
addSimpleRecords(log, batches)
621+
609622
val totalBatches = batches.size
610623
val partialBatches = totalBatches / 2
611624

@@ -914,7 +927,7 @@ class DumpLogSegmentsTest {
914927
fields.toMap
915928
}
916929

917-
private def assertDumpLogRecordMetadata(): Unit = {
930+
private def assertDumpLogRecordMetadata(log: UnifiedLog): Unit = {
918931
val logReadInfo = log.read(
919932
startOffset = 0,
920933
maxLength = Int.MaxValue,
@@ -977,4 +990,4 @@ object DumpLogSegmentsTest {
977990
class TestDecoderWithoutVerifiableProperties() extends kafka.serializer.Decoder[Array[Byte]] {
978991
override def fromBytes(bytes: Array[Byte]): Array[Byte] = bytes
979992
}
980-
}
993+
}

0 commit comments

Comments
 (0)