Skip to content

Commit 0e9cde3

Browse files
authored
Upgrade to Kafka 4.1 (#73)
1 parent 40fc69c commit 0e9cde3

File tree

6 files changed

+17
-13
lines changed

6 files changed

+17
-13
lines changed

build.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ allprojects {
1616
repositories {
1717
mavenCentral()
1818
maven(url = "https://packages.confluent.io/maven/")
19-
maven(url = "https://s01.oss.sonatype.org/content/repositories/snapshots")
19+
maven(url = "https://central.sonatype.com/repository/maven-snapshots")
2020
}
2121
}
2222

error-handling-avro/build.gradle.kts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@ dependencies {
2626
testImplementation(libs.mockito.junit)
2727
testImplementation(libs.assertj)
2828
testImplementation(libs.log4j.slf4j2)
29-
testImplementation(libs.kafka.streams.avro.serde)
29+
testImplementation(libs.kafka.streams.avro.serde) {
30+
exclude(group = "org.apache.kafka") // force usage of OSS kafka-clients
31+
}
3032
}
3133

3234
avro {

error-handling-core/src/main/java/com/bakdata/kafka/ErrorCapturingProcessor.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public final class ErrorCapturingProcessor<K, V, KR, VR>
6868
* @see ErrorUtil#isRecoverable(Exception)
6969
*/
7070
public static <K, V, KR, VR> Processor<K, V, KR, ProcessedKeyValue<K, V, VR>> captureErrors(
71-
final @NonNull Processor<? super K, ? super V, KR, VR> processor) {
71+
final @NonNull Processor<? super K, ? super V, ? extends KR, ? extends VR> processor) {
7272
return captureErrors(processor, ErrorUtil::isRecoverable);
7373
}
7474

@@ -91,7 +91,7 @@ public static <K, V, KR, VR> Processor<K, V, KR, ProcessedKeyValue<K, V, VR>> ca
9191
* @return {@code Processor}
9292
*/
9393
public static <K, V, KR, VR> Processor<K, V, KR, ProcessedKeyValue<K, V, VR>> captureErrors(
94-
final @NonNull Processor<? super K, ? super V, KR, VR> processor,
94+
final @NonNull Processor<? super K, ? super V, ? extends KR, ? extends VR> processor,
9595
final @NonNull Predicate<Exception> errorFilter) {
9696
return new ErrorCapturingProcessor<>((Processor<K, V, KR, VR>) processor, errorFilter);
9797
}
@@ -110,7 +110,7 @@ public static <K, V, KR, VR> Processor<K, V, KR, ProcessedKeyValue<K, V, VR>> ca
110110
* @see ErrorUtil#isRecoverable(Exception)
111111
*/
112112
public static <K, V, KR, VR> ProcessorSupplier<K, V, KR, ProcessedKeyValue<K, V, VR>> captureErrors(
113-
final @NonNull ProcessorSupplier<? super K, ? super V, KR, VR> supplier) {
113+
final @NonNull ProcessorSupplier<? super K, ? super V, ? extends KR, ? extends VR> supplier) {
114114
return captureErrors(supplier, ErrorUtil::isRecoverable);
115115
}
116116

@@ -134,7 +134,7 @@ public static <K, V, KR, VR> ProcessorSupplier<K, V, KR, ProcessedKeyValue<K, V,
134134
* @return {@code ProcessorSupplier}
135135
*/
136136
public static <K, V, KR, VR> ProcessorSupplier<K, V, KR, ProcessedKeyValue<K, V, VR>> captureErrors(
137-
final @NonNull ProcessorSupplier<? super K, ? super V, KR, VR> supplier,
137+
final @NonNull ProcessorSupplier<? super K, ? super V, ? extends KR, ? extends VR> supplier,
138138
final @NonNull Predicate<Exception> errorFilter) {
139139
return new ProcessorSupplier<>() {
140140
@Override

error-handling-core/src/main/java/com/bakdata/kafka/ErrorCapturingValueProcessor.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public final class ErrorCapturingValueProcessor<K, V, VR>
6666
* @see ErrorUtil#isRecoverable(Exception)
6767
*/
6868
public static <K, V, VR> FixedKeyProcessor<K, V, ProcessedValue<V, VR>> captureErrors(
69-
final @NonNull FixedKeyProcessor<? super K, ? super V, VR> processor) {
69+
final @NonNull FixedKeyProcessor<? super K, ? super V, ? extends VR> processor) {
7070
return captureErrors(processor, ErrorUtil::isRecoverable);
7171
}
7272

@@ -88,7 +88,7 @@ public static <K, V, VR> FixedKeyProcessor<K, V, ProcessedValue<V, VR>> captureE
8888
* @return {@code FixedKeyProcessor}
8989
*/
9090
public static <K, V, VR> FixedKeyProcessor<K, V, ProcessedValue<V, VR>> captureErrors(
91-
final @NonNull FixedKeyProcessor<? super K, ? super V, VR> processor,
91+
final @NonNull FixedKeyProcessor<? super K, ? super V, ? extends VR> processor,
9292
final @NonNull Predicate<Exception> errorFilter) {
9393
return new ErrorCapturingValueProcessor<>((FixedKeyProcessor<K, V, VR>) processor, errorFilter);
9494
}
@@ -106,7 +106,7 @@ public static <K, V, VR> FixedKeyProcessor<K, V, ProcessedValue<V, VR>> captureE
106106
* @see ErrorUtil#isRecoverable(Exception)
107107
*/
108108
public static <K, V, VR> FixedKeyProcessorSupplier<K, V, ProcessedValue<V, VR>> captureErrors(
109-
final @NonNull FixedKeyProcessorSupplier<? super K, ? super V, VR> supplier) {
109+
final @NonNull FixedKeyProcessorSupplier<? super K, ? super V, ? extends VR> supplier) {
110110
return captureErrors(supplier, ErrorUtil::isRecoverable);
111111
}
112112

@@ -129,7 +129,7 @@ public static <K, V, VR> FixedKeyProcessorSupplier<K, V, ProcessedValue<V, VR>>
129129
* @return {@code FixedKeyProcessorSupplier}
130130
*/
131131
public static <K, V, VR> FixedKeyProcessorSupplier<K, V, ProcessedValue<V, VR>> captureErrors(
132-
final @NonNull FixedKeyProcessorSupplier<? super K, ? super V, VR> supplier,
132+
final @NonNull FixedKeyProcessorSupplier<? super K, ? super V, ? extends VR> supplier,
133133
final @NonNull Predicate<Exception> errorFilter) {
134134
return new FixedKeyProcessorSupplier<>() {
135135
@Override

error-handling-proto/build.gradle.kts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@ dependencies {
1717
testImplementation(libs.mockito.junit)
1818
testImplementation(libs.assertj)
1919
testImplementation(libs.log4j.slf4j2)
20-
testImplementation(libs.kafka.streams.protobuf.serde)
20+
testImplementation(libs.kafka.streams.protobuf.serde) {
21+
exclude(group = "org.apache.kafka") // force usage of OSS kafka-clients
22+
}
2123
}
2224

2325
protobuf {

gradle/libs.versions.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ junit = "5.14.0"
44
mockito = "5.20.0"
55

66
[libraries]
7-
kafka-bom = { group = "com.bakdata.kafka", name = "kafka-bom", version = "1.2.1" }
7+
kafka-bom = { group = "com.bakdata.kafka", name = "kafka-bom", version = "1.3.0" }
88
kafka-streams = { group = "org.apache.kafka", name = "kafka-streams" }
99
kafka-streams-avro-serde = { group = "io.confluent", name = "kafka-streams-avro-serde" }
1010
kafka-streams-protobuf-serde = { group = "io.confluent", name = "kafka-streams-protobuf-serde" }
@@ -21,7 +21,7 @@ junit-jupiter = { group = "org.junit.jupiter", name = "junit-jupiter", version.r
2121
assertj = { group = "org.assertj", name = "assertj-core", version = "3.27.6" }
2222
mockito-core = { group = "org.mockito", name = "mockito-core", version.ref = "mockito" }
2323
mockito-junit = { group = "org.mockito", name = "mockito-junit-jupiter", version.ref = "mockito" }
24-
fluentKafkaStreamsTests = { group = "com.bakdata.fluent-kafka-streams-tests", name = "fluent-kafka-streams-tests-junit5", version = "3.4.1" }
24+
fluentKafkaStreamsTests = { group = "com.bakdata.fluent-kafka-streams-tests", name = "fluent-kafka-streams-tests-junit5", version = "3.5.0" }
2525
log4j-slf4j2 = { group = "org.apache.logging.log4j", name = "log4j-slf4j2-impl", version = "2.25.2" }
2626

2727
[plugins]

0 commit comments

Comments
 (0)