Skip to content

Commit

Permalink
Allow ProcessingResult to wrap record with headers (#317)
Browse files Browse the repository at this point in the history
  • Loading branch information
FredLive authored Feb 14, 2025
1 parent 23db0a3 commit 59078fa
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package com.michelin.kstreamplify.error;

import lombok.Getter;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.streams.processor.api.Record;

/**
Expand Down Expand Up @@ -106,6 +107,47 @@ public static <K, V, V2> Record<K, ProcessingResult<V, V2>> wrapRecordSuccess(K
return new Record<>(key, ProcessingResult.success(value), timestamp);
}

/**
* Wraps a key, value, timestamp and headers in a Record with ProcessingResult#success(V value) as value.
* The resulting stream needs to be handled with TopologyErrorHandler#catchErrors(KStream)
* for automatic DLQ redirection on failed records.
*
* @param key The key to put in the resulting record
* @param value The successful value to put in the resulting record
* @param timestamp The timestamp to apply on the resulting record
* @param headers The headers values to put in the resulting record
* @param <K> The type of the record key
* @param <V> The type of the ProcessingResult successful value
* @param <V2> The type of the ProcessingResult error value
* @return A Record with value wrapped in a {@link ProcessingResult}
*/
public static <K, V, V2> Record<K, ProcessingResult<V, V2>> wrapRecordSuccess(K key,
V value,
long timestamp,
Headers headers) {
return new Record<>(key, ProcessingResult.success(value), timestamp, headers);
}

/**
* Wraps a record's value and the headers with ProcessingResult.success(V value).
* The resulting stream needs to be handled with TopologyErrorHandler#catchErrors(KStream)
* for automatic DLQ redirection on failed records.
*
* @param message The resulting successful Record from the processor that needs to be wrapped in a ProcessingResult
* @param <K> The type of the record key
* @param <V> The type of the ProcessingResult successful value
* @param <V2> The type of the ProcessingResult error value
* @return The initial Record, with value wrapped in a ProcessingResult
*/
public static <K, V, V2> Record<K, ProcessingResult<V, V2>> wrapRecordSuccessWithHeaders(Record<K, V> message) {
return new Record<>(
message.key(),
ProcessingResult.success(message.value()),
message.timestamp(),
message.headers()
);
}

/**
* Create a failed processing result.
* If you are using this in a Processor, refer to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.nio.charset.StandardCharsets;
import java.util.Collections;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.streams.processor.api.Record;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -56,6 +61,65 @@ void shouldCreateWrappedProcessingResult() {
assertEquals(message.timestamp(), wrappedRecord.timestamp());
}

@Test
void shouldWrapRecordSuccessWithHeadersFromRecordType() {
String value = "Value";
String headerKey = "MSG_HEADER";
String headerValue = "Header value";
long timestamp = System.currentTimeMillis();

Headers headers = new RecordHeaders(Collections.singletonList(
new RecordHeader(headerKey, headerValue.getBytes(StandardCharsets.UTF_8))));

Record<String, String> message = new Record<>("key", value, timestamp, headers);
Record<String, ProcessingResult<String, Integer>> wrappedRecord =
ProcessingResult.wrapRecordSuccessWithHeaders(message);
// check key
assertEquals(message.key(), wrappedRecord.key());
// check header
assertEquals(1, wrappedRecord.headers().toArray().length);
assertEquals(
new String(headerValue.getBytes(StandardCharsets.UTF_8), StandardCharsets.UTF_8),
new String(wrappedRecord.headers().lastHeader(headerKey).value(), StandardCharsets.UTF_8)
);
// Check value
assertNotNull(wrappedRecord.value());
assertTrue(wrappedRecord.value().isValid());
assertEquals(value, wrappedRecord.value().getValue());
assertNull(wrappedRecord.value().getError());
// Check timestamp
assertEquals(message.timestamp(), wrappedRecord.timestamp());
}

@Test
void shouldWrapRecordSuccessWithHeadersFromParameters() {
String value = "Value";
String headerKey = "MSG_HEADER";
String headerValue = "Header value";
long timestamp = System.currentTimeMillis();

Headers headers = new RecordHeaders(Collections.singletonList(
new RecordHeader(headerKey, headerValue.getBytes(StandardCharsets.UTF_8))));

Record<String, ProcessingResult<String, Integer>> wrappedRecord =
ProcessingResult.wrapRecordSuccess("key", value, timestamp, headers);
// check key
assertEquals("key", wrappedRecord.key());
// check header
assertEquals(1, wrappedRecord.headers().toArray().length);
assertEquals(
new String(headerValue.getBytes(StandardCharsets.UTF_8), StandardCharsets.UTF_8),
new String(wrappedRecord.headers().lastHeader(headerKey).value(), StandardCharsets.UTF_8)
);
// Check value
assertNotNull(wrappedRecord.value());
assertTrue(wrappedRecord.value().isValid());
assertEquals(value, wrappedRecord.value().getValue());
assertNull(wrappedRecord.value().getError());
// Check timestamp
assertEquals(timestamp, wrappedRecord.timestamp());
}

@Test
void shouldCreateFailedProcessingResult() {
String failedRecordValue = "Failed Value";
Expand Down

0 comments on commit 59078fa

Please sign in to comment.