Skip to content

Commit

Permalink
KAFKA-16207; KRaft's internal log listener to update voter set (#15671)
Browse files Browse the repository at this point in the history
Adds support for the KafkaRaftClient to read the control records KRaftVersionRecord and VotersRecord in the snapshot and log. As the control records in the KRaft partition are read, the replica's known set of voters are updated. This change also contains the necessary changes to include the control records when a snapshot is generated by the KRaft state machine.

It is important to note that this commit changes the code and the in-memory state to track the sets of voters but it doesn't change any data that is externally exposed. It doesn't change the RPCs, data stored on disk or configuration.

When the KRaft replica starts the PartitionListener reads the latest snapshot and then log segments up to the LEO, updating the in-memory state as it reads KRaftVersionRecord and VotersRecord. When the replica (leader and follower) appends to the log, the PartitionListener catches up to the new LEO. When the replica truncates the log because of a diverging epoch, the PartitionListener also truncates the in-memory state to the new LEO. When the state machine generate a new snapshot the PartitionListener trims any prefix entries that are not needed. This is all done to minimize the amount of data tracked in-memory and to make sure that it matches the state on disk.

To implement the functionality described above this commit also makes the following changes:

Adds control records for KRaftVersionRecord and VotersRecord. KRaftVersionRecord describes the finalized kraft.version supported by all of the replicas. VotersRecords describes the set of voters at a specific offset.

Changes Kafka's feature version to support 0 as the smallest valid value. This is needed because the default value for kraft.version is 0.

Refactors FileRawSnapshotWriter so that it doesn't directly call the onSnapshotFrozen callback. It adds NotifyingRawSnapshotWriter for calling such callbacks. This reorganization is needed because in this change both the KafkaMetadataLog and the KafkaRaftClient need to react to snapshots getting frozen.

Cleans up KafkaRaftClient's initialization. Removes initialize from RaftClient - this is an implementation detail that doesn't need to be exposed in the interface. Removes RaftConfig.AddressSpec and simplifies the bootstrapping of the static voter's address. The bootstrapping of the address is delayed because of tests. We should be able to simplify this further in future commits.

Update the DumpLogSegment CLI to support the new control records KRaftVersionRecord and VotersRecord.

Fix the RecordsSnapshotReader implementations so that the iterator includes control records. RecordsIterator is extended to support reading the new control records.
Improve the BatchAccumulator implementation to allow multiple control records in one control batch. This is needed so that KRaft can make sure that VotersRecord is included in the same batch as the control record (KRaftVersionRecord) that upgrades the kraft.version to 1.

Add a History interface and default implementation TreeMapHistory. This is used to track all of the sets of voters between the latest snapshot and the LEO. This is needed so that KafkaRaftClient can query for the latest set of voters and so that KafkaRaftClient can include the correct set of voters when the state machine generates a new snapshot at a given offset.

Add a builder pattern for RecordsSnapshotWriter. The new builder pattern also implements including the KRaftVersionRecord and VotersRecord control records in the snapshot as necessary. A KRaftVersionRecord should be appended if the kraft.version is greater than 0 at the snapshot's offset. Similarly, a VotersRecord should be appended to the snapshot with the latest value up to the snapshot's offset.

Reviewers: Jason Gustafson <[email protected]>
  • Loading branch information
jsancio authored May 4, 2024
1 parent 9b8aac2 commit bfe81d6
Show file tree
Hide file tree
Showing 73 changed files with 3,650 additions and 765 deletions.
3 changes: 2 additions & 1 deletion checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -456,12 +456,13 @@
<allow pkg="org.apache.kafka.snapshot" />
<allow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.common.config" />
<allow pkg="org.apache.kafka.common.feature" />
<allow pkg="org.apache.kafka.common.message" />
<allow pkg="org.apache.kafka.common.metadata" />
<allow pkg="org.apache.kafka.common.metrics" />
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.common.record" />
<allow pkg="org.apache.kafka.common.requests" />
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.server.common.serialization" />
<allow pkg="org.apache.kafka.server.fault"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class SupportedVersionRange {

/**
* Raises an exception unless the following conditions are met:
* 1 <= minVersion <= maxVersion.
* 0 <= minVersion <= maxVersion.
*
* @param minVersion The minimum version value.
* @param maxVersion The maximum version value.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
/**
* Represents an immutable basic version range using 2 attributes: min and max, each of type short.
* The min and max attributes need to satisfy 2 rules:
* - they are each expected to be >= 1, as we only consider positive version values to be valid.
* - they are each expected to be >= 0, as we only consider positive version values to be valid.
* - max should be >= min.
*
* The class also provides API to convert the version range to a map.
Expand All @@ -48,22 +48,22 @@ class BaseVersionRange {

/**
* Raises an exception unless the following condition is met:
* minValue >= 1 and maxValue >= 1 and maxValue >= minValue.
* minValue >= 0 and maxValue >= 0 and maxValue >= minValue.
*
* @param minKeyLabel Label for the min version key, that's used only to convert to/from a map.
* @param minValue The minimum version value.
* @param maxKeyLabel Label for the max version key, that's used only to convert to/from a map.
* @param maxValue The maximum version value.
*
* @throws IllegalArgumentException If any of the following conditions are true:
* - (minValue < 1) OR (maxValue < 1) OR (maxValue < minValue).
* - (minValue < 0) OR (maxValue < 0) OR (maxValue < minValue).
* - minKeyLabel is empty, OR, minKeyLabel is empty.
*/
protected BaseVersionRange(String minKeyLabel, short minValue, String maxKeyLabel, short maxValue) {
if (minValue < 1 || maxValue < 1 || maxValue < minValue) {
if (minValue < 0 || maxValue < 0 || maxValue < minValue) {
throw new IllegalArgumentException(
String.format(
"Expected minValue >= 1, maxValue >= 1 and maxValue >= minValue, but received" +
"Expected minValue >= 0, maxValue >= 0 and maxValue >= minValue, but received" +
" minValue: %d, maxValue: %d", minValue, maxValue));
}
if (minKeyLabel.isEmpty()) {
Expand All @@ -86,6 +86,7 @@ public short max() {
return maxValue;
}

@Override
public String toString() {
return String.format(
"%s[%s]",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public SupportedVersionRange(short minVersion, short maxVersion) {
}

public SupportedVersionRange(short maxVersion) {
this((short) 1, maxVersion);
this((short) 0, maxVersion);
}

public static SupportedVersionRange fromMap(Map<String, Short> versionRangeMap) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,15 @@ public enum ControlRecordType {
ABORT((short) 0),
COMMIT((short) 1),

// Raft quorum related control messages.
// KRaft quorum related control messages
LEADER_CHANGE((short) 2),
SNAPSHOT_HEADER((short) 3),
SNAPSHOT_FOOTER((short) 4),

// KRaft membership changes messages
KRAFT_VERSION((short) 5),
KRAFT_VOTERS((short) 6),

// UNKNOWN is used to indicate a control type which the client is not aware of and should be ignored
UNKNOWN((short) -1);

Expand Down Expand Up @@ -108,6 +112,10 @@ public static ControlRecordType fromTypeId(short typeId) {
return SNAPSHOT_HEADER;
case 4:
return SNAPSHOT_FOOTER;
case 5:
return KRAFT_VERSION;
case 6:
return KRAFT_VOTERS;

default:
return UNKNOWN;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
*/
package org.apache.kafka.common.record;

import org.apache.kafka.common.message.KRaftVersionRecord;
import org.apache.kafka.common.message.LeaderChangeMessage;
import org.apache.kafka.common.message.SnapshotHeaderRecord;
import org.apache.kafka.common.message.SnapshotFooterRecord;
import org.apache.kafka.common.message.SnapshotHeaderRecord;
import org.apache.kafka.common.message.VotersRecord;
import org.apache.kafka.common.protocol.ByteBufferAccessor;

import java.nio.ByteBuffer;
Expand All @@ -27,49 +29,77 @@
* Utility class for easy interaction with control records.
*/
public class ControlRecordUtils {
public static final short KRAFT_VERSION_CURRENT_VERSION = 0;
public static final short LEADER_CHANGE_CURRENT_VERSION = 0;
public static final short SNAPSHOT_HEADER_CURRENT_VERSION = 0;
public static final short SNAPSHOT_FOOTER_CURRENT_VERSION = 0;
public static final short SNAPSHOT_HEADER_CURRENT_VERSION = 0;
public static final short KRAFT_VOTERS_CURRENT_VERSION = 0;

public static LeaderChangeMessage deserializeLeaderChangeMessage(Record record) {
ControlRecordType recordType = ControlRecordType.parse(record.key());
if (recordType != ControlRecordType.LEADER_CHANGE) {
throw new IllegalArgumentException(
"Expected LEADER_CHANGE control record type(2), but found " + recordType.toString());
}
validateControlRecordType(ControlRecordType.LEADER_CHANGE, recordType);

return deserializeLeaderChangeMessage(record.value());
}

public static LeaderChangeMessage deserializeLeaderChangeMessage(ByteBuffer data) {
ByteBufferAccessor byteBufferAccessor = new ByteBufferAccessor(data.slice());
return new LeaderChangeMessage(byteBufferAccessor, LEADER_CHANGE_CURRENT_VERSION);
return new LeaderChangeMessage(new ByteBufferAccessor(data.slice()), LEADER_CHANGE_CURRENT_VERSION);
}

public static SnapshotHeaderRecord deserializeSnapshotHeaderRecord(Record record) {
ControlRecordType recordType = ControlRecordType.parse(record.key());
if (recordType != ControlRecordType.SNAPSHOT_HEADER) {
throw new IllegalArgumentException(
"Expected SNAPSHOT_HEADER control record type(3), but found " + recordType.toString());
}
validateControlRecordType(ControlRecordType.SNAPSHOT_HEADER, recordType);

return deserializeSnapshotHeaderRecord(record.value());
}

public static SnapshotHeaderRecord deserializeSnapshotHeaderRecord(ByteBuffer data) {
ByteBufferAccessor byteBufferAccessor = new ByteBufferAccessor(data.slice());
return new SnapshotHeaderRecord(byteBufferAccessor, SNAPSHOT_HEADER_CURRENT_VERSION);
return new SnapshotHeaderRecord(new ByteBufferAccessor(data.slice()), SNAPSHOT_HEADER_CURRENT_VERSION);
}

public static SnapshotFooterRecord deserializeSnapshotFooterRecord(Record record) {
ControlRecordType recordType = ControlRecordType.parse(record.key());
if (recordType != ControlRecordType.SNAPSHOT_FOOTER) {
throw new IllegalArgumentException(
"Expected SNAPSHOT_FOOTER control record type(4), but found " + recordType.toString());
}
validateControlRecordType(ControlRecordType.SNAPSHOT_FOOTER, recordType);

return deserializeSnapshotFooterRecord(record.value());
}

public static SnapshotFooterRecord deserializeSnapshotFooterRecord(ByteBuffer data) {
ByteBufferAccessor byteBufferAccessor = new ByteBufferAccessor(data.slice());
return new SnapshotFooterRecord(byteBufferAccessor, SNAPSHOT_FOOTER_CURRENT_VERSION);
return new SnapshotFooterRecord(new ByteBufferAccessor(data.slice()), SNAPSHOT_FOOTER_CURRENT_VERSION);
}

public static KRaftVersionRecord deserializeKRaftVersionRecord(Record record) {
ControlRecordType recordType = ControlRecordType.parse(record.key());
validateControlRecordType(ControlRecordType.KRAFT_VERSION, recordType);

return deserializeKRaftVersionRecord(record.value());
}

public static KRaftVersionRecord deserializeKRaftVersionRecord(ByteBuffer data) {
return new KRaftVersionRecord(new ByteBufferAccessor(data.slice()), KRAFT_VERSION_CURRENT_VERSION);
}

public static VotersRecord deserializeVotersRecord(Record record) {
ControlRecordType recordType = ControlRecordType.parse(record.key());
validateControlRecordType(ControlRecordType.KRAFT_VOTERS, recordType);

return deserializeVotersRecord(record.value());
}

public static VotersRecord deserializeVotersRecord(ByteBuffer data) {
return new VotersRecord(new ByteBufferAccessor(data.slice()), KRAFT_VOTERS_CURRENT_VERSION);
}

private static void validateControlRecordType(ControlRecordType expected, ControlRecordType actual) {
if (actual != expected) {
throw new IllegalArgumentException(
String.format(
"Expected %s control record type(%d), but found %s",
expected,
expected.type(),
actual
)
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.message.KRaftVersionRecord;
import org.apache.kafka.common.message.LeaderChangeMessage;
import org.apache.kafka.common.message.SnapshotHeaderRecord;
import org.apache.kafka.common.message.SnapshotFooterRecord;
import org.apache.kafka.common.message.SnapshotHeaderRecord;
import org.apache.kafka.common.message.VotersRecord;
import org.apache.kafka.common.network.TransferableChannel;
import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention;
import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetentionResult;
Expand Down Expand Up @@ -154,7 +156,7 @@ public FilterResult filterTo(TopicPartition partition, RecordFilter filter, Byte

/**
* Note: This method is also used to convert the first timestamp of the batch (which is usually the timestamp of the first record)
* to the delete horizon of the tombstones or txn markers which are present in the batch.
* to the delete horizon of the tombstones or txn markers which are present in the batch.
*/
private static FilterResult filterTo(TopicPartition partition, Iterable<MutableRecordBatch> batches,
RecordFilter filter, ByteBuffer destinationBuffer, int maxRecordBatchSize,
Expand Down Expand Up @@ -728,83 +730,114 @@ public static MemoryRecords withLeaderChangeMessage(
ByteBuffer buffer,
LeaderChangeMessage leaderChangeMessage
) {
writeLeaderChangeMessage(buffer, initialOffset, timestamp, leaderEpoch, leaderChangeMessage);
buffer.flip();
return MemoryRecords.readableRecords(buffer);
try (MemoryRecordsBuilder builder = createKraftControlReccordBuilder(
initialOffset,
timestamp,
leaderEpoch,
buffer
)
) {
builder.appendLeaderChangeMessage(timestamp, leaderChangeMessage);
return builder.build();
}
}

private static void writeLeaderChangeMessage(
ByteBuffer buffer,
public static MemoryRecords withSnapshotHeaderRecord(
long initialOffset,
long timestamp,
int leaderEpoch,
LeaderChangeMessage leaderChangeMessage
ByteBuffer buffer,
SnapshotHeaderRecord snapshotHeaderRecord
) {
try (MemoryRecordsBuilder builder = new MemoryRecordsBuilder(
buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE,
TimestampType.CREATE_TIME, initialOffset, timestamp,
RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
false, true, leaderEpoch, buffer.capacity())
try (MemoryRecordsBuilder builder = createKraftControlReccordBuilder(
initialOffset,
timestamp,
leaderEpoch,
buffer
)
) {
builder.appendLeaderChangeMessage(timestamp, leaderChangeMessage);
builder.appendSnapshotHeaderMessage(timestamp, snapshotHeaderRecord);
return builder.build();
}
}

public static MemoryRecords withSnapshotHeaderRecord(
public static MemoryRecords withSnapshotFooterRecord(
long initialOffset,
long timestamp,
int leaderEpoch,
ByteBuffer buffer,
SnapshotHeaderRecord snapshotHeaderRecord
SnapshotFooterRecord snapshotFooterRecord
) {
writeSnapshotHeaderRecord(buffer, initialOffset, timestamp, leaderEpoch, snapshotHeaderRecord);
buffer.flip();
return MemoryRecords.readableRecords(buffer);
try (MemoryRecordsBuilder builder = createKraftControlReccordBuilder(
initialOffset,
timestamp,
leaderEpoch,
buffer
)
) {
builder.appendSnapshotFooterMessage(timestamp, snapshotFooterRecord);
return builder.build();
}
}

private static void writeSnapshotHeaderRecord(
ByteBuffer buffer,
public static MemoryRecords withKRaftVersionRecord(
long initialOffset,
long timestamp,
int leaderEpoch,
SnapshotHeaderRecord snapshotHeaderRecord
ByteBuffer buffer,
KRaftVersionRecord kraftVersionRecord
) {
try (MemoryRecordsBuilder builder = new MemoryRecordsBuilder(
buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE,
TimestampType.CREATE_TIME, initialOffset, timestamp,
RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
false, true, leaderEpoch, buffer.capacity())
try (MemoryRecordsBuilder builder = createKraftControlReccordBuilder(
initialOffset,
timestamp,
leaderEpoch,
buffer
)
) {
builder.appendSnapshotHeaderMessage(timestamp, snapshotHeaderRecord);
builder.appendKRaftVersionMessage(timestamp, kraftVersionRecord);
return builder.build();
}
}

public static MemoryRecords withSnapshotFooterRecord(
public static MemoryRecords withVotersRecord(
long initialOffset,
long timestamp,
int leaderEpoch,
ByteBuffer buffer,
SnapshotFooterRecord snapshotFooterRecord
VotersRecord votersRecord
) {
writeSnapshotFooterRecord(buffer, initialOffset, timestamp, leaderEpoch, snapshotFooterRecord);
buffer.flip();
return MemoryRecords.readableRecords(buffer);
try (MemoryRecordsBuilder builder = createKraftControlReccordBuilder(
initialOffset,
timestamp,
leaderEpoch,
buffer
)
) {
builder.appendVotersMessage(timestamp, votersRecord);
return builder.build();
}
}

private static void writeSnapshotFooterRecord(
ByteBuffer buffer,
private static MemoryRecordsBuilder createKraftControlReccordBuilder(
long initialOffset,
long timestamp,
int leaderEpoch,
SnapshotFooterRecord snapshotFooterRecord
ByteBuffer buffer
) {
try (MemoryRecordsBuilder builder = new MemoryRecordsBuilder(
buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE,
TimestampType.CREATE_TIME, initialOffset, timestamp,
RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
false, true, leaderEpoch, buffer.capacity())
) {
builder.appendSnapshotFooterMessage(timestamp, snapshotFooterRecord);
}
return new MemoryRecordsBuilder(
buffer,
RecordBatch.CURRENT_MAGIC_VALUE,
CompressionType.NONE,
TimestampType.CREATE_TIME,
initialOffset,
timestamp,
RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH,
RecordBatch.NO_SEQUENCE,
false,
true,
leaderEpoch,
buffer.capacity()
);
}
}
Loading

0 comments on commit bfe81d6

Please sign in to comment.