Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16505: Fix lost source raw key and value in store caches and buffers #18739

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions clients/src/main/java/org/apache/kafka/common/utils/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
*/
package org.apache.kafka.common.utils;

import static java.nio.charset.StandardCharsets.UTF_8;

import java.util.Optional;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
Expand Down Expand Up @@ -314,6 +317,30 @@ public static byte[] getNullableSizePrefixedArray(final ByteBuffer buffer) {
return getNullableArray(buffer, size);
}

/**
* Starting from the current position, read an optional marker char indicating the presence of a byte array to read.
* If the marker is present, read the size of the byte array to read, then read the array.
* If the marker is not present, reset the buffer to the saved position and return an empty Optional.
* @param marker The marker char to indicate the presence of a byte array
* @param buffer The buffer to read a size-prefixed array from
* @return The array
*/
public static Optional<byte[]> getOptionalField(final char marker, final ByteBuffer buffer) {
if (buffer.remaining() < Character.BYTES) {
return Optional.empty();
}

buffer.mark();

char serializedMarker = buffer.getChar();
if (serializedMarker == marker) {
return Optional.of(getNullableSizePrefixedArray(buffer));
}

buffer.reset(); // marker is not present, reset the buffer to the saved position
return Optional.empty();
}

/**
* Read a byte array of the given size. Consumes the buffer: upon returning, the buffer's position
* is after the array that is returned.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,10 @@ private static void assertProcessingExceptionHandlerInputs(final ErrorHandlerCon
assertTrue(Arrays.asList("ID123-A2", "ID123-A5").contains((String) record.value()));
assertEquals("TOPIC_NAME", context.topic());
assertEquals("KSTREAM-PROCESSOR-0000000003", context.processorNodeId());
assertTrue(Arrays.equals("ID123-2-ERR".getBytes(), context.sourceRawKey())
|| Arrays.equals("ID123-5-ERR".getBytes(), context.sourceRawKey()));
assertTrue(Arrays.equals("ID123-A2".getBytes(), context.sourceRawValue())
|| Arrays.equals("ID123-A5".getBytes(), context.sourceRawValue()));
assertEquals(TIMESTAMP.toEpochMilli(), context.timestamp());
assertTrue(exception.getMessage().contains("Exception should be handled by processing exception handler"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,4 +147,38 @@ public interface ErrorHandlerContext {
* @return The timestamp.
*/
long timestamp();

/**
* Return the non-deserialized byte[] of the input message key if the context has been triggered by a message.
*
* <p> If this method is invoked within a {@link Punctuator#punctuate(long)
* punctuation callback}, or while processing a record that was forwarded by a punctuation
* callback, it will return null.
*
* <p> If this method is invoked in a sub-topology due to a repartition, the returned key would be one sent
* to the repartition topic.
*
* <p> Always returns null if this method is invoked within a
* ProductionExceptionHandler.handle(ErrorHandlerContext, ProducerRecord, Exception)
*
* @return the raw byte of the key of the source message
*/
byte[] sourceRawKey();

/**
* Return the non-deserialized byte[] of the input message value if the context has been triggered by a message.
*
* <p> If this method is invoked within a {@link Punctuator#punctuate(long)
* punctuation callback}, or while processing a record that was forwarded by a punctuation
* callback, it will return null.
*
* <p> If this method is invoked in a sub-topology due to a repartition, the returned key would be one sent
* to the repartition topic.
*
* <p> Always returns null if this method is invoked within a
* ProductionExceptionHandler.handle(ErrorHandlerContext, ProducerRecord, Exception)
*
* @return the raw byte of the value of the source message
*/
byte[] sourceRawValue();
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public class DefaultErrorHandlerContext implements ErrorHandlerContext {
private final Headers headers;
private final String processorNodeId;
private final TaskId taskId;
private final byte[] sourceRawKey;
private final byte[] sourceRawValue;

private final long timestamp;
private final ProcessorContext processorContext;
Expand All @@ -44,7 +46,9 @@ public DefaultErrorHandlerContext(final ProcessorContext processorContext,
final Headers headers,
final String processorNodeId,
final TaskId taskId,
final long timestamp) {
final long timestamp,
final byte[] sourceRawKey,
final byte[] sourceRawValue) {
this.topic = topic;
this.partition = partition;
this.offset = offset;
Expand All @@ -53,6 +57,8 @@ public DefaultErrorHandlerContext(final ProcessorContext processorContext,
this.taskId = taskId;
this.processorContext = processorContext;
this.timestamp = timestamp;
this.sourceRawKey = sourceRawKey;
this.sourceRawValue = sourceRawValue;
}

@Override
Expand Down Expand Up @@ -90,6 +96,14 @@ public long timestamp() {
return timestamp;
}

public byte[] sourceRawKey() {
return sourceRawKey;
}

public byte[] sourceRawValue() {
return sourceRawValue;
}

@Override
public String toString() {
// we do exclude headers on purpose, to not accidentally log user data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,37 @@ public interface RecordContext {
*/
Headers headers();

/**
* Return the non-deserialized byte[] of the input message key if the context has been triggered by a message.
*
* <p> If this method is invoked within a {@link Punctuator#punctuate(long)
* punctuation callback}, or while processing a record that was forwarded by a punctuation
* callback, it will return null.
*
* <p> If this method is invoked in a sub-topology due to a repartition, the returned key would be one sent
* to the repartition topic.
*
* <p> Always returns null if this method is invoked within a
* ProductionExceptionHandler.handle(ErrorHandlerContext, ProducerRecord, Exception)
*
* @return the raw byte of the key of the source message
*/
byte[] sourceRawKey();

/**
* Return the non-deserialized byte[] of the input message value if the context has been triggered by a message.
*
* <p> If this method is invoked within a {@link Punctuator#punctuate(long)
* punctuation callback}, or while processing a record that was forwarded by a punctuation
* callback, it will return null.
*
* <p> If this method is invoked in a sub-topology due to a repartition, the returned key would be one sent
* to the repartition topic.
*
* <p> Always returns null if this method is invoked within a
* ProductionExceptionHandler.handle(ErrorHandlerContext, ProducerRecord, Exception)
*
* @return the raw byte of the value of the source message
*/
byte[] sourceRawValue();
}
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,10 @@ public <K, V> void forward(final Record<K, V> record, final String childName) {
recordContext.offset(),
recordContext.partition(),
recordContext.topic(),
record.headers());
record.headers(),
recordContext.sourceRawKey(),
recordContext.sourceRawValue()
);
}

if (childName == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,9 @@ public void process(final Record<KIn, VIn> record) {
internalProcessorContext.recordContext().headers(),
internalProcessorContext.currentNode().name(),
internalProcessorContext.taskId(),
internalProcessorContext.recordContext().timestamp()
internalProcessorContext.recordContext().timestamp(),
internalProcessorContext.recordContext().sourceRawKey(),
internalProcessorContext.recordContext().sourceRawValue()
);

final ProcessingExceptionHandler.ProcessingHandlerResponse response;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,30 @@
*/
package org.apache.kafka.streams.processor.internals;

import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Objects.requireNonNull;
import static org.apache.kafka.common.utils.Utils.getNullableSizePrefixedArray;
import static org.apache.kafka.common.utils.Utils.getOptionalField;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Objects;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.streams.processor.RecordContext;
import org.apache.kafka.streams.processor.api.RecordMetadata;

import java.nio.ByteBuffer;
import java.util.Objects;

import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Objects.requireNonNull;
import static org.apache.kafka.common.utils.Utils.getNullableSizePrefixedArray;

public class ProcessorRecordContext implements RecordContext, RecordMetadata {

private final long timestamp;
private final long offset;
private final String topic;
private final int partition;
private final Headers headers;
private byte[] sourceRawKey;
private byte[] sourceRawValue;

public ProcessorRecordContext(final long timestamp,
final long offset,
Expand All @@ -48,6 +51,24 @@ public ProcessorRecordContext(final long timestamp,
this.topic = topic;
this.partition = partition;
this.headers = Objects.requireNonNull(headers);
this.sourceRawKey = null;
this.sourceRawValue = null;
Comment on lines +54 to +55
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You also need to add these info to the serialize() and deserialize() so that the buffer values also get the source record. Here it gets a bit tricky, because you need to consider the case where a serialized record context does not contain the source record because it was written by a version of Streams that has not yet had the source record in the context.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed having "optional" raw key and value make the deserialization tricky.

Let's say we serialize the ProcessorRecordContext in this order timestamp, offset, topic, partition, headers, rawKey, rawValue. After deserializing the headers, the next bytes can be rawKey and rawValue or can be something else (e.g., priorValue

final byte[] priorValue = getNullableSizePrefixedArray(buffer);
)

Right now I'm considering serializing the rawKey and rawValue at the very end of the ByteBuffer (i.e., right after here:

). Thus, after deserializing all the non-optional fields if there is some bytes remaining in the buffer, it should be the rawKey and rawValue.

}

public ProcessorRecordContext(final long timestamp,
final long offset,
final int partition,
final String topic,
final Headers headers,
final byte[] sourceRawKey,
final byte[] sourceRawValue) {
this.timestamp = timestamp;
this.offset = offset;
this.topic = topic;
this.partition = partition;
this.headers = Objects.requireNonNull(headers);
this.sourceRawKey = sourceRawKey;
this.sourceRawValue = sourceRawValue;
}

@Override
Expand Down Expand Up @@ -75,6 +96,16 @@ public Headers headers() {
return headers;
}

@Override
public byte[] sourceRawKey() {
return sourceRawKey;
}

@Override
public byte[] sourceRawValue() {
return sourceRawValue;
}

public long residentMemorySizeEstimate() {
long size = 0;
size += Long.BYTES; // value.context.timestamp
Expand Down Expand Up @@ -124,6 +155,18 @@ public byte[] serialize() {
headerValuesBytes[i] = valueBytes;
}

if (sourceRawKey != null) {
size += Character.BYTES; // marker for sourceRawKey being present
size += Integer.BYTES; // size of sourceRawKey
size += sourceRawKey.length;
}

if (sourceRawValue != null) {
size += Character.BYTES; // marker for sourceRawValue being present
size += Integer.BYTES; // size of sourceRawValue
size += sourceRawValue.length;
}

final ByteBuffer buffer = ByteBuffer.allocate(size);
buffer.putLong(timestamp);
buffer.putLong(offset);
Expand All @@ -146,6 +189,18 @@ public byte[] serialize() {
}
}

if (sourceRawKey != null) {
buffer.putChar('k');
buffer.putInt(sourceRawKey.length);
buffer.put(sourceRawKey);
}

if (sourceRawValue != null) {
buffer.putChar('v');
buffer.putInt(sourceRawValue.length);
buffer.put(sourceRawValue);
}

return buffer.array();
}

Expand Down Expand Up @@ -173,7 +228,15 @@ public static ProcessorRecordContext deserialize(final ByteBuffer buffer) {
headers = new RecordHeaders(headerArr);
}

return new ProcessorRecordContext(timestamp, offset, partition, topic, headers);
final byte[] rawKey = getOptionalField('k', buffer).orElse(null);
final byte[] rawValue = getOptionalField('v', buffer).orElse(null);

return new ProcessorRecordContext(timestamp, offset, partition, topic, headers, rawKey, rawValue);
}

public void freeRawRecord() {
this.sourceRawKey = null;
this.sourceRawValue = null;
}

@Override
Expand All @@ -189,7 +252,9 @@ public boolean equals(final Object o) {
offset == that.offset &&
partition == that.partition &&
Objects.equals(topic, that.topic) &&
Objects.equals(headers, that.headers);
Objects.equals(headers, that.headers) &&
Arrays.equals(sourceRawKey, that.sourceRawKey) &&
Arrays.equals(sourceRawValue, that.sourceRawValue);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,10 @@ public <K, V> void send(final String topic,

final ProducerRecord<byte[], byte[]> serializedRecord = new ProducerRecord<>(topic, partition, timestamp, keyBytes, valBytes, headers);

// As many records could be in-flight,
// freeing raw records in the context to reduce memory pressure
freeRawInputRecordFromContext(context);

streamsProducer.send(serializedRecord, (metadata, exception) -> {
try {
// if there's already an exception record, skip logging offsets or new exceptions
Expand Down Expand Up @@ -311,6 +315,12 @@ public <K, V> void send(final String topic,
});
}

private static void freeRawInputRecordFromContext(final InternalProcessorContext<Void, Void> context) {
if (context != null && context.recordContext() != null) {
context.recordContext().freeRawRecord();
}
}

private <K, V> void handleException(final ProductionExceptionHandler.SerializationExceptionOrigin origin,
final String topic,
final K key,
Expand Down Expand Up @@ -388,7 +398,9 @@ private DefaultErrorHandlerContext errorHandlerContext(final InternalProcessorCo
recordContext.headers(),
processorNodeId,
taskId,
recordContext.timestamp()
recordContext.timestamp(),
context.recordContext().sourceRawKey(),
context.recordContext().sourceRawValue()
) :
new DefaultErrorHandlerContext(
context,
Expand All @@ -398,7 +410,9 @@ private DefaultErrorHandlerContext errorHandlerContext(final InternalProcessorCo
new RecordHeaders(),
processorNodeId,
taskId,
-1L
-1L,
null,
null
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,10 @@ public static void handleDeserializationFailure(final DeserializationExceptionHa
rawRecord.headers(),
sourceNodeName,
processorContext.taskId(),
rawRecord.timestamp());
rawRecord.timestamp(),
rawRecord.key(),
rawRecord.value()
);

final DeserializationHandlerResponse response;
try {
Expand Down
Loading