2424
2525package com .bakdata .kafka .streams ;
2626
27- import static java .util .Collections .emptyList ;
28- import static java .util .Collections .emptyMap ;
29-
3027import com .bakdata .kafka .KafkaApplication ;
31- import com .bakdata .kafka .StringListConverter ;
32- import java . util . List ;
33- import java . util . Map ;
28+ import com .bakdata .kafka .mixin . ErrorOptions ;
29+ import com . bakdata . kafka . mixin . InputOptions ;
30+ import com . bakdata . kafka . mixin . OutputOptions ;
3431import java .util .Optional ;
35- import java .util .regex .Pattern ;
3632import lombok .Getter ;
3733import lombok .RequiredArgsConstructor ;
3834import lombok .Setter ;
3935import lombok .ToString ;
36+ import lombok .experimental .Delegate ;
4037import lombok .extern .slf4j .Slf4j ;
4138import org .apache .kafka .streams .KafkaStreams ;
4239import org .apache .kafka .streams .KafkaStreams .StateListener ;
4340import org .apache .kafka .streams .StreamsConfig ;
4441import org .apache .kafka .streams .errors .StreamsUncaughtExceptionHandler ;
4542import picocli .CommandLine ;
4643import picocli .CommandLine .Command ;
47- import picocli .CommandLine .UseDefaultConverter ;
48-
44+ import picocli .CommandLine .Mixin ;
4945
5046/**
5147 * <p>The base class for creating Kafka Streams applications.</p>
5248 * This class provides the following configuration options in addition to those provided by {@link KafkaApplication}:
5349 * <ul>
54- * <li>{@link #inputTopics}</li>
55- * <li>{@link #inputPattern}</li>
56- * <li>{@link #errorTopic}</li>
57- * <li>{@link #labeledInputTopics}</li>
58- * <li>{@link #labeledInputPatterns}</li>
50+ * <li>{@link #getInputTopics()}</li>
51+ * <li>{@link #getInputPattern()}</li>
52+ * <li>{@link #getLabeledInputTopics()}</li>
53+ * <li>{@link #getLabeledInputPatterns()}</li>
54+ * <li>{@link #getOutputTopic()}</li>
55+ * <li>{@link #getLabeledOutputTopics()}</li>
56+ * <li>{@link #getErrorTopic()}</li>
5957 * <li>{@link #volatileGroupInstanceId}</li>
6058 * </ul>
6159 * To implement your Kafka Streams application inherit from this class and add your custom options. Run it by
7270public abstract class KafkaStreamsApplication <T extends StreamsApp > extends
7371 KafkaApplication <StreamsRunner , StreamsCleanUpRunner , StreamsExecutionOptions ,
7472 ExecutableStreamsApp <T >, ConfiguredStreamsApp <T >, StreamsTopicConfig , T , StreamsAppConfiguration > {
75- @ CommandLine .Option (names = "--input-topics" , description = "Input topics" , split = "," )
76- private List <String > inputTopics = emptyList ();
77- @ CommandLine .Option (names = "--input-pattern" , description = "Input pattern" )
78- private Pattern inputPattern ;
79- @ CommandLine .Option (names = "--error-topic" , description = "Error topic" )
80- private String errorTopic ;
81- @ CommandLine .Option (names = "--labeled-input-topics" , split = "," , description = "Additional labeled input topics" ,
82- converter = {UseDefaultConverter .class , StringListConverter .class })
83- private Map <String , List <String >> labeledInputTopics = emptyMap ();
84- @ CommandLine .Option (names = "--labeled-input-patterns" , split = "," ,
85- description = "Additional labeled input patterns" )
86- private Map <String , Pattern > labeledInputPatterns = emptyMap ();
73+ @ Mixin
74+ @ Delegate
75+ private InputOptions inputOptions = new InputOptions ();
76+ @ Mixin
77+ @ Delegate
78+ private OutputOptions outputOptions = new OutputOptions ();
79+ @ Mixin
80+ @ Delegate
81+ private ErrorOptions errorOptions = new ErrorOptions ();
8782 @ CommandLine .Option (names = "--volatile-group-instance-id" , arity = "0..1" ,
8883 description = "Whether the group instance id is volatile, i.e., it will change on a Streams shutdown." )
8984 private boolean volatileGroupInstanceId ;
9085 @ CommandLine .Option (names = "--application-id" ,
9186 description = "Unique application ID to use for Kafka Streams. Can also be provided by implementing "
92- + "StreamsApp#getUniqueAppId()" )
87+ + "StreamsApp#getUniqueAppId()" )
9388 private String applicationId ;
9489
9590 /**
9691 * Reset the Kafka Streams application. Additionally, delete the consumer group and all output and intermediate
9792 * topics associated with the Kafka Streams application.
9893 */
9994 @ Command (description = "Reset the Kafka Streams application. Additionally, delete the consumer group and all "
100- + "output and intermediate topics associated with the Kafka Streams application." )
95+ + "output and intermediate topics associated with the Kafka Streams application." )
10196 @ Override
10297 public void clean () {
10398 super .clean ();
@@ -108,7 +103,7 @@ public void clean() {
108103 * application.
109104 */
110105 @ Command (description = "Clear all state stores, consumer group offsets, and internal topics associated with the "
111- + "Kafka Streams application." )
106+ + "Kafka Streams application." )
112107 public void reset () {
113108 this .prepareClean ();
114109 try (final CleanableApp <StreamsCleanUpRunner > app = this .createCleanableApp ()) {
@@ -131,13 +126,13 @@ public final Optional<StreamsExecutionOptions> createExecutionOptions() {
131126 @ Override
132127 public final StreamsTopicConfig createTopicConfig () {
133128 return StreamsTopicConfig .builder ()
134- .inputTopics (this .inputTopics )
135- .labeledInputTopics (this .labeledInputTopics )
136- .inputPattern (this .inputPattern )
137- .labeledInputPatterns (this .labeledInputPatterns )
129+ .inputTopics (this .getInputTopics () )
130+ .labeledInputTopics (this .getLabeledInputTopics () )
131+ .inputPattern (this .getInputPattern () )
132+ .labeledInputPatterns (this .getLabeledInputPatterns () )
138133 .outputTopic (this .getOutputTopic ())
139134 .labeledOutputTopics (this .getLabeledOutputTopics ())
140- .errorTopic (this .errorTopic )
135+ .errorTopic (this .getErrorTopic () )
141136 .build ();
142137 }
143138
0 commit comments