Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18723; Better handle invalid records during replication #18852

Open
wants to merge 14 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -1095,6 +1095,7 @@ project(':core') {
testImplementation project(':test-common:test-common-util')
testImplementation libs.bcpkix
testImplementation libs.mockitoCore
testImplementation libs.jqwik
testImplementation(libs.apacheda) {
exclude group: 'xml-apis', module: 'xml-apis'
// `mina-core` is a transitive dependency for `apacheds` and `apacheda`.
Expand Down Expand Up @@ -1289,6 +1290,12 @@ project(':core') {
)
}

test {
useJUnitPlatform {
includeEngines 'jqwik', 'junit-jupiter'
}
}

tasks.create(name: "copyDependantTestLibs", type: Copy) {
from (configurations.testRuntimeClasspath) {
include('*.jar')
Expand Down Expand Up @@ -1860,6 +1867,7 @@ project(':clients') {
testImplementation libs.jacksonJakartarsJsonProvider
testImplementation libs.jose4j
testImplementation libs.junitJupiter
testImplementation libs.jqwik
testImplementation libs.spotbugs
testImplementation libs.mockitoCore
testImplementation libs.mockitoJunitJupiter // supports MockitoExtension
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public void ensureValid() {

/**
* Gets the base timestamp of the batch which is used to calculate the record timestamps from the deltas.
*
*
* @return The base timestamp
*/
public long baseTimestamp() {
Expand Down Expand Up @@ -502,6 +502,7 @@ public static void writeHeader(ByteBuffer buffer,
public String toString() {
return "RecordBatch(magic=" + magic() + ", offsets=[" + baseOffset() + ", " + lastOffset() + "], " +
"sequence=[" + baseSequence() + ", " + lastSequence() + "], " +
"partitionLeaderEpoch=" + partitionLeaderEpoch() + ", " +
"isTransactional=" + isTransactional() + ", isControlBatch=" + isControlBatch() + ", " +
"compression=" + compressionType() + ", timestampType=" + timestampType() + ", crc=" + checksum() + ")";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,6 @@
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.GatheringByteChannel;
Expand All @@ -50,7 +47,6 @@
* or one of the {@link #builder(ByteBuffer, byte, Compression, TimestampType, long)} variants.
*/
public class MemoryRecords extends AbstractRecords {
private static final Logger log = LoggerFactory.getLogger(MemoryRecords.class);
public static final MemoryRecords EMPTY = MemoryRecords.readableRecords(ByteBuffer.allocate(0));

private final ByteBuffer buffer;
Expand Down Expand Up @@ -602,7 +598,7 @@ public static MemoryRecords withRecords(byte magic, long initialOffset, Compress
return withRecords(magic, initialOffset, compression, TimestampType.CREATE_TIME, records);
}

public static MemoryRecords withRecords(long initialOffset, Compression compression, Integer partitionLeaderEpoch, SimpleRecord... records) {
public static MemoryRecords withRecords(long initialOffset, Compression compression, int partitionLeaderEpoch, SimpleRecord... records) {
return withRecords(RecordBatch.CURRENT_MAGIC_VALUE, initialOffset, compression, TimestampType.CREATE_TIME, RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, partitionLeaderEpoch, false, records);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.record;

import net.jqwik.api.Arbitraries;
import net.jqwik.api.Arbitrary;
import net.jqwik.api.ArbitrarySupplier;

import java.nio.ByteBuffer;
import java.util.Random;

public final class ArbitraryMemoryRecords implements ArbitrarySupplier<MemoryRecords> {
@Override
public Arbitrary<MemoryRecords> get() {
return Arbitraries.randomValue(ArbitraryMemoryRecords::buildRandomRecords);
}

private static MemoryRecords buildRandomRecords(Random random) {
int size = random.nextInt(128) + 1;
byte[] bytes = new byte[size];
random.nextBytes(bytes);

return MemoryRecords.readableRecords(ByteBuffer.wrap(bytes));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.record;

import org.apache.kafka.common.errors.CorruptRecordException;

import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.ArgumentsProvider;

import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.stream.Stream;

public final class InvalidMemoryRecordsProvider implements ArgumentsProvider {
// Use a baseOffset that's not zero so that it is less likely to match the LEO
private static final long BASE_OFFSET = 1234;
private static final int EPOCH = 4321;

/**
* Returns a stream of arguments for invalid memory records and the expected exception.
*
* The first object in the {@code Arguments} is a {@code MemoryRecords}.
*
* The second object in the {@code Arguments} is an {@code Optional<Class<Exception>>} which is
* the expected exception from the log layer.
*/
@Override
public Stream<? extends Arguments> provideArguments(ExtensionContext context) {
return Stream.of(
Arguments.of(MemoryRecords.readableRecords(notEnoughtBytes()), Optional.empty()),
Arguments.of(MemoryRecords.readableRecords(recordsSizeTooSmall()), Optional.of(CorruptRecordException.class)),
Arguments.of(MemoryRecords.readableRecords(notEnoughBytesToMagic()), Optional.empty()),
Arguments.of(MemoryRecords.readableRecords(negativeMagic()), Optional.of(CorruptRecordException.class)),
Arguments.of(MemoryRecords.readableRecords(largeMagic()), Optional.of(CorruptRecordException.class)),
Arguments.of(MemoryRecords.readableRecords(lessBytesThanRecordSize()), Optional.empty())
);
}

private static ByteBuffer notEnoughtBytes() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: Enought -> Enough

var buffer = ByteBuffer.allocate(Records.LOG_OVERHEAD - 1);
buffer.limit(buffer.capacity());

return buffer;
}

private static ByteBuffer recordsSizeTooSmall() {
var buffer = ByteBuffer.allocate(256);
// Write the base offset
buffer.putLong(BASE_OFFSET);
// Write record size
buffer.putInt(LegacyRecord.RECORD_OVERHEAD_V0 - 1);
buffer.position(0);
buffer.limit(buffer.capacity());

return buffer;
}

private static ByteBuffer notEnoughBytesToMagic() {
var buffer = ByteBuffer.allocate(256);
// Write the base offset
buffer.putLong(BASE_OFFSET);
// Write record size
buffer.putInt(buffer.capacity() - Records.LOG_OVERHEAD);
buffer.position(0);
buffer.limit(Records.HEADER_SIZE_UP_TO_MAGIC - 1);

return buffer;
}

private static ByteBuffer negativeMagic() {
var buffer = ByteBuffer.allocate(256);
// Write the base offset
buffer.putLong(BASE_OFFSET);
// Write record size
buffer.putInt(buffer.capacity() - Records.LOG_OVERHEAD);
// Write the epoch
buffer.putInt(EPOCH);
// Write magic
buffer.put((byte) -1);
buffer.position(0);
buffer.limit(buffer.capacity());

return buffer;
}

private static ByteBuffer largeMagic() {
var buffer = ByteBuffer.allocate(256);
// Write the base offset
buffer.putLong(BASE_OFFSET);
// Write record size
buffer.putInt(buffer.capacity() - Records.LOG_OVERHEAD);
// Write the epoch
buffer.putInt(EPOCH);
// Write magic
buffer.put((byte) (RecordBatch.CURRENT_MAGIC_VALUE + 1));
buffer.position(0);
buffer.limit(buffer.capacity());

return buffer;
}

private static ByteBuffer lessBytesThanRecordSize() {
var buffer = ByteBuffer.allocate(256);
// Write the base offset
buffer.putLong(BASE_OFFSET);
// Write record size
buffer.putInt(buffer.capacity() - Records.LOG_OVERHEAD);
// Write the epoch
buffer.putInt(EPOCH);
// Write magic
buffer.put(RecordBatch.CURRENT_MAGIC_VALUE);
buffer.position(0);
buffer.limit(buffer.capacity() - Records.LOG_OVERHEAD - 1);

return buffer;
}
}
20 changes: 14 additions & 6 deletions core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1302,27 +1302,35 @@ class Partition(val topicPartition: TopicPartition,
}
}

private def doAppendRecordsToFollowerOrFutureReplica(records: MemoryRecords, isFuture: Boolean): Option[LogAppendInfo] = {
private def doAppendRecordsToFollowerOrFutureReplica(
records: MemoryRecords,
isFuture: Boolean,
partitionLeaderEpoch: Int
): Option[LogAppendInfo] = {
if (isFuture) {
// The read lock is needed to handle race condition if request handler thread tries to
// remove future replica after receiving AlterReplicaLogDirsRequest.
inReadLock(leaderIsrUpdateLock) {
// Note the replica may be undefined if it is removed by a non-ReplicaAlterLogDirsThread before
// this method is called
futureLog.map { _.appendAsFollower(records) }
futureLog.map { _.appendAsFollower(records, partitionLeaderEpoch) }
}
} else {
// The lock is needed to prevent the follower replica from being updated while ReplicaAlterDirThread
// is executing maybeReplaceCurrentWithFutureReplica() to replace follower replica with the future replica.
futureLogLock.synchronized {
Some(localLogOrException.appendAsFollower(records))
Some(localLogOrException.appendAsFollower(records, partitionLeaderEpoch))
}
}
}

def appendRecordsToFollowerOrFutureReplica(records: MemoryRecords, isFuture: Boolean): Option[LogAppendInfo] = {
def appendRecordsToFollowerOrFutureReplica(
records: MemoryRecords,
isFuture: Boolean,
partitionLeaderEpoch: Int
): Option[LogAppendInfo] = {
try {
doAppendRecordsToFollowerOrFutureReplica(records, isFuture)
doAppendRecordsToFollowerOrFutureReplica(records, isFuture, partitionLeaderEpoch)
} catch {
case e: UnexpectedAppendOffsetException =>
val log = if (isFuture) futureLocalLogOrException else localLogOrException
Expand All @@ -1340,7 +1348,7 @@ class Partition(val topicPartition: TopicPartition,
info(s"Unexpected offset in append to $topicPartition. First offset ${e.firstOffset} is less than log start offset ${log.logStartOffset}." +
s" Since this is the first record to be appended to the $replicaName's log, will start the log from offset ${e.firstOffset}.")
truncateFullyAndStartAt(e.firstOffset, isFuture)
doAppendRecordsToFollowerOrFutureReplica(records, isFuture)
doAppendRecordsToFollowerOrFutureReplica(records, isFuture, partitionLeaderEpoch)
} else
throw e
}
Expand Down
Loading
Loading