Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16505: Fix source raw key and value in store caches #18739

Open
wants to merge 2 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,10 @@ private static void assertProcessingExceptionHandlerInputs(final ErrorHandlerCon
assertTrue(Arrays.asList("ID123-A2", "ID123-A5").contains((String) record.value()));
assertEquals("TOPIC_NAME", context.topic());
assertEquals("KSTREAM-PROCESSOR-0000000003", context.processorNodeId());
assertTrue(Arrays.equals("ID123-2-ERR".getBytes(), context.sourceRawKey())
|| Arrays.equals("ID123-5-ERR".getBytes(), context.sourceRawKey()));
assertTrue(Arrays.equals("ID123-A2".getBytes(), context.sourceRawValue())
|| Arrays.equals("ID123-A5".getBytes(), context.sourceRawValue()));
assertEquals(TIMESTAMP.toEpochMilli(), context.timestamp());
assertTrue(exception.getMessage().contains("Exception should be handled by processing exception handler"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,4 +147,38 @@ public interface ErrorHandlerContext {
* @return The timestamp.
*/
long timestamp();

/**
* Return the non-deserialized byte[] of the input message key if the context has been triggered by a message.
*
* <p> If this method is invoked within a {@link Punctuator#punctuate(long)
* punctuation callback}, or while processing a record that was forwarded by a punctuation
* callback, it will return null.
*
* <p> If this method is invoked in a sub-topology due to a repartition, the returned key would be one sent
* to the repartition topic.
*
* <p> Always returns null if this method is invoked within a
* ProductionExceptionHandler.handle(ErrorHandlerContext, ProducerRecord, Exception)
*
* @return the raw byte of the key of the source message
*/
byte[] sourceRawKey();

/**
* Return the non-deserialized byte[] of the input message value if the context has been triggered by a message.
*
* <p> If this method is invoked within a {@link Punctuator#punctuate(long)
* punctuation callback}, or while processing a record that was forwarded by a punctuation
* callback, it will return null.
*
* <p> If this method is invoked in a sub-topology due to a repartition, the returned key would be one sent
* to the repartition topic.
*
* <p> Always returns null if this method is invoked within a
* ProductionExceptionHandler.handle(ErrorHandlerContext, ProducerRecord, Exception)
*
* @return the raw byte of the value of the source message
*/
byte[] sourceRawValue();
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public class DefaultErrorHandlerContext implements ErrorHandlerContext {
private final Headers headers;
private final String processorNodeId;
private final TaskId taskId;
private final byte[] sourceRawKey;
private final byte[] sourceRawValue;

private final long timestamp;
private final ProcessorContext processorContext;
Expand All @@ -44,7 +46,9 @@ public DefaultErrorHandlerContext(final ProcessorContext processorContext,
final Headers headers,
final String processorNodeId,
final TaskId taskId,
final long timestamp) {
final long timestamp,
final byte[] sourceRawKey,
final byte[] sourceRawValue) {
this.topic = topic;
this.partition = partition;
this.offset = offset;
Expand All @@ -53,6 +57,8 @@ public DefaultErrorHandlerContext(final ProcessorContext processorContext,
this.taskId = taskId;
this.processorContext = processorContext;
this.timestamp = timestamp;
this.sourceRawKey = sourceRawKey;
this.sourceRawValue = sourceRawValue;
}

@Override
Expand Down Expand Up @@ -90,6 +96,14 @@ public long timestamp() {
return timestamp;
}

public byte[] sourceRawKey() {
return sourceRawKey;
}

public byte[] sourceRawValue() {
return sourceRawValue;
}

@Override
public String toString() {
// we do exclude headers on purpose, to not accidentally log user data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,38 @@ public interface RecordContext {
*/
Headers headers();

/**
* Return the non-deserialized byte[] of the input message key if the context has been triggered by a message.
*
* <p> If this method is invoked within a {@link Punctuator#punctuate(long)
* punctuation callback}, or while processing a record that was forwarded by a punctuation
* callback, it will return null.
*
* <p> If this method is invoked in a sub-topology due to a repartition, the returned key would be one sent
* to the repartition topic.
*
* <p> Always returns null if this method is invoked within a
* ProductionExceptionHandler.handle(ErrorHandlerContext, ProducerRecord, Exception)
*
* @return the raw byte of the key of the source message
*/
byte[] sourceRawKey();

/**
* Return the non-deserialized byte[] of the input message value if the context has been triggered by a message.
*
* <p> If this method is invoked within a {@link Punctuator#punctuate(long)
* punctuation callback}, or while processing a record that was forwarded by a punctuation
* callback, it will return null.
*
* <p> If this method is invoked in a sub-topology due to a repartition, the returned key would be one sent
* to the repartition topic.
*
* <p> Always returns null if this method is invoked within a
* ProductionExceptionHandler.handle(ErrorHandlerContext, ProducerRecord, Exception)
*
* @return the raw byte of the value of the source message
*/
byte[] sourceRawValue();

}
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,10 @@ public <K, V> void forward(final Record<K, V> record, final String childName) {
recordContext.offset(),
recordContext.partition(),
recordContext.topic(),
record.headers());
record.headers(),
recordContext.sourceRawKey(),
recordContext.sourceRawValue()
);
}

if (childName == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,9 @@ public void process(final Record<KIn, VIn> record) {
internalProcessorContext.recordContext().headers(),
internalProcessorContext.currentNode().name(),
internalProcessorContext.taskId(),
internalProcessorContext.recordContext().timestamp()
);
internalProcessorContext.recordContext().timestamp(),
internalProcessorContext.recordContext().sourceRawKey(),
internalProcessorContext.recordContext().sourceRawValue());

final ProcessingExceptionHandler.ProcessingHandlerResponse response;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public class ProcessorRecordContext implements RecordContext, RecordMetadata {
private final String topic;
private final int partition;
private final Headers headers;
private byte[] sourceRawKey;
private byte[] sourceRawValue;

public ProcessorRecordContext(final long timestamp,
final long offset,
Expand All @@ -48,6 +50,24 @@ public ProcessorRecordContext(final long timestamp,
this.topic = topic;
this.partition = partition;
this.headers = Objects.requireNonNull(headers);
this.sourceRawKey = null;
this.sourceRawValue = null;
Comment on lines +53 to +54
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You also need to add these info to the serialize() and deserialize() so that the buffer values also get the source record. Here it gets a bit tricky, because you need to consider the case where a serialized record context does not contain the source record because it was written by a version of Streams that has not yet had the source record in the context.

}

public ProcessorRecordContext(final long timestamp,
final long offset,
final int partition,
final String topic,
final Headers headers,
final byte[] sourceRawKey,
final byte[] sourceRawValue) {
this.timestamp = timestamp;
this.offset = offset;
this.topic = topic;
this.partition = partition;
this.headers = Objects.requireNonNull(headers);
this.sourceRawKey = sourceRawKey;
this.sourceRawValue = sourceRawValue;
}

@Override
Expand Down Expand Up @@ -75,6 +95,16 @@ public Headers headers() {
return headers;
}

@Override
public byte[] sourceRawKey() {
return sourceRawKey;
}

@Override
public byte[] sourceRawValue() {
return sourceRawValue;
}

public long residentMemorySizeEstimate() {
long size = 0;
size += Long.BYTES; // value.context.timestamp
Expand Down Expand Up @@ -176,6 +206,11 @@ public static ProcessorRecordContext deserialize(final ByteBuffer buffer) {
return new ProcessorRecordContext(timestamp, offset, partition, topic, headers);
}

public void freeRawRecord() {
this.sourceRawKey = null;
this.sourceRawValue = null;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,10 @@ public <K, V> void send(final String topic,

final ProducerRecord<byte[], byte[]> serializedRecord = new ProducerRecord<>(topic, partition, timestamp, keyBytes, valBytes, headers);

// As many records could be in-flight,
// freeing raw records in the context to reduce memory pressure
freeContext(context);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name of this method is a bit misleading. It basically frees the raw record within the context, not the whole context. What about calling it freeRawInputRecordFromContext()?


streamsProducer.send(serializedRecord, (metadata, exception) -> {
try {
// if there's already an exception record, skip logging offsets or new exceptions
Expand Down Expand Up @@ -311,6 +315,12 @@ public <K, V> void send(final String topic,
});
}

private static void freeContext(final InternalProcessorContext<Void, Void> context) {
if (context != null && context.recordContext() != null) {
context.recordContext().freeRawRecord();
}
}

private <K, V> void handleException(final ProductionExceptionHandler.SerializationExceptionOrigin origin,
final String topic,
final K key,
Expand Down Expand Up @@ -388,7 +398,9 @@ private DefaultErrorHandlerContext errorHandlerContext(final InternalProcessorCo
recordContext.headers(),
processorNodeId,
taskId,
recordContext.timestamp()
recordContext.timestamp(),
context.recordContext().sourceRawKey(),
context.recordContext().sourceRawValue()
) :
new DefaultErrorHandlerContext(
context,
Expand All @@ -398,7 +410,9 @@ private DefaultErrorHandlerContext errorHandlerContext(final InternalProcessorCo
new RecordHeaders(),
processorNodeId,
taskId,
-1L
-1L,
null,
null
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,9 @@ public static void handleDeserializationFailure(final DeserializationExceptionHa
rawRecord.headers(),
sourceNodeName,
processorContext.taskId(),
rawRecord.timestamp());
rawRecord.timestamp(),
rawRecord.key(),
rawRecord.value());

final DeserializationHandlerResponse response;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ private void updateHead() {
lastCorruptedRecord = raw;
continue;
}
headRecord = new StampedRecord(deserialized, timestamp);
headRecord = new StampedRecord(deserialized, timestamp, raw.key(), raw.value());
headRecordSizeInBytes = consumerRecordSizeInBytes(raw);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,22 @@

public class StampedRecord extends Stamped<ConsumerRecord<?, ?>> {

private final byte[] rawKey;
private final byte[] rawValue;

public StampedRecord(final ConsumerRecord<?, ?> record, final long timestamp) {
super(record, timestamp);
this.rawKey = null;
this.rawValue = null;
}

public StampedRecord(final ConsumerRecord<?, ?> record,
final long timestamp,
final byte[] rawKey,
final byte[] rawValue) {
super(record, timestamp);
this.rawKey = rawKey;
this.rawValue = rawValue;
}

public String topic() {
Expand Down Expand Up @@ -55,8 +69,26 @@ public Headers headers() {
return value.headers();
}

public byte[] rawKey() {
return rawKey;
}

public byte[] rawValue() {
return rawValue;
}

@Override
public String toString() {
return value.toString() + ", timestamp = " + timestamp;
}

@Override
public boolean equals(final Object other) {
return super.equals(other);
}

@Override
public int hashCode() {
return super.hashCode();
}
Comment on lines +84 to +93
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are those needed?

}
Original file line number Diff line number Diff line change
Expand Up @@ -853,7 +853,9 @@ private void doProcess(final long wallClockTime) {
record.offset(),
record.partition(),
record.topic(),
record.headers()
record.headers(),
record.rawKey(),
record.rawValue()
);
updateProcessorContext(currNode, wallClockTime, recordContext);

Expand Down Expand Up @@ -935,7 +937,9 @@ record = null;
recordContext.headers(),
node.name(),
id(),
recordContext.timestamp()
recordContext.timestamp(),
recordContext.sourceRawKey(),
recordContext.sourceRawValue()
);

final ProcessingExceptionHandler.ProcessingHandlerResponse response;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,9 @@ private void putInternal(final Bytes key,
internalContext.recordContext().offset(),
internalContext.recordContext().timestamp(),
internalContext.recordContext().partition(),
internalContext.recordContext().topic()
internalContext.recordContext().topic(),
internalContext.recordContext().sourceRawKey(),
internalContext.recordContext().sourceRawValue()
)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,9 @@ public void put(final Windowed<Bytes> key, final byte[] value) {
internalContext.recordContext().offset(),
internalContext.recordContext().timestamp(),
internalContext.recordContext().partition(),
internalContext.recordContext().topic()
internalContext.recordContext().topic(),
internalContext.recordContext().sourceRawKey(),
internalContext.recordContext().sourceRawValue()
);
internalContext.cache().put(cacheName, cacheFunction.cacheKey(binaryKey), entry);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,9 @@ public synchronized void put(final Bytes key,
internalContext.recordContext().offset(),
internalContext.recordContext().timestamp(),
internalContext.recordContext().partition(),
internalContext.recordContext().topic()
internalContext.recordContext().topic(),
internalContext.recordContext().sourceRawKey(),
internalContext.recordContext().sourceRawValue()
);
internalContext.cache().put(cacheName, cacheFunction.cacheKey(keyBytes), entry);

Expand Down
Loading