diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoin.java index 755530edb1b4b..3ddd00f823502 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoin.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoin.java @@ -27,20 +27,18 @@ import java.util.Optional; import java.util.Set; -import static java.util.Collections.singleton; +class KStreamKTableJoin implements ProcessorSupplier { -class KStreamKTableJoin implements ProcessorSupplier { - - private final KeyValueMapper keyValueMapper = (key, value) -> key; - private final KTableValueGetterSupplier valueGetterSupplier; - private final ValueJoinerWithKey joiner; + private final KeyValueMapper keyValueMapper = (key, value) -> key; + private final KTableValueGetterSupplier valueGetterSupplier; + private final ValueJoinerWithKey joiner; private final boolean leftJoin; private final Optional gracePeriod; private final Optional storeName; private final Set> stores; - KStreamKTableJoin(final KTableValueGetterSupplier valueGetterSupplier, - final ValueJoinerWithKey joiner, + KStreamKTableJoin(final KTableValueGetterSupplier valueGetterSupplier, + final ValueJoinerWithKey joiner, final boolean leftJoin, final Optional gracePeriod, final Optional> bufferStoreBuilder) { @@ -59,7 +57,7 @@ public Set> stores() { } @Override - public Processor get() { + public Processor get() { return new KStreamKTableJoinProcessor<>(valueGetterSupplier.get(), keyValueMapper, joiner, leftJoin, gracePeriod, storeName); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java index 903dba1452e94..9cc43dd3d7155 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java @@ -42,24 +42,24 @@ import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor; import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; -class KStreamKTableJoinProcessor extends ContextualProcessor { +class KStreamKTableJoinProcessor extends ContextualProcessor { private static final Logger LOG = LoggerFactory.getLogger(KStreamKTableJoin.class); private final KTableValueGetter valueGetter; - private final KeyValueMapper keyMapper; - private final ValueJoinerWithKey joiner; + private final KeyValueMapper keyMapper; + private final ValueJoinerWithKey joiner; private final boolean leftJoin; private Sensor droppedRecordsSensor; private final Optional gracePeriod; - private TimeOrderedKeyValueBuffer buffer; + private TimeOrderedKeyValueBuffer buffer; protected long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; private InternalProcessorContext internalProcessorContext; private final boolean useBuffer; private final String storeName; KStreamKTableJoinProcessor(final KTableValueGetter valueGetter, - final KeyValueMapper keyMapper, - final ValueJoinerWithKey joiner, + final KeyValueMapper keyMapper, + final ValueJoinerWithKey joiner, final boolean leftJoin, final Optional gracePeriod, final Optional storeName) { @@ -73,7 +73,7 @@ class KStreamKTableJoinProcessor extends ContextualP } @Override - public void init(final ProcessorContext context) { + public void init(final ProcessorContext context) { super.init(context); final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics(); droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics); @@ -89,7 +89,7 @@ public void init(final ProcessorContext context) { } @Override - public void process(final Record record) { + public void process(final Record record) { internalProcessorContext = asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext) context()); updateObservedStreamTime(record.timestamp()); if (maybeDropRecord(record)) { @@ -107,8 +107,8 @@ public void process(final Record record) { } } - private void emit(final TimeOrderedKeyValueBuffer.Eviction toEmit) { - final Record record = new Record<>(toEmit.key(), toEmit.value(), toEmit.recordContext().timestamp()) + private void emit(final TimeOrderedKeyValueBuffer.Eviction toEmit) { + final Record record = new Record<>(toEmit.key(), toEmit.value(), toEmit.recordContext().timestamp()) .withHeaders(toEmit.recordContext().headers()); final ProcessorRecordContext prevRecordContext = internalProcessorContext.recordContext(); try { @@ -124,7 +124,7 @@ protected void updateObservedStreamTime(final long timestamp) { } @SuppressWarnings("unchecked") - private void doJoin(final Record record) { + private void doJoin(final Record record) { final KTable mappedKey = keyMapper.apply(record.key(), record.value()); final VTable value2 = getValue2(record, mappedKey); if (leftJoin || value2 != null) { @@ -132,7 +132,7 @@ private void doJoin(final Record record) { } } - private VTable getValue2(final Record record, final KTable mappedKey) { + private VTable getValue2(final Record record, final KTable mappedKey) { if (mappedKey == null) return null; final ValueAndTimestamp valueAndTimestamp = valueGetter.isVersioned() ? valueGetter.get(mappedKey, record.timestamp()) @@ -140,7 +140,7 @@ private VTable getValue2(final Record record, final KTable mappedKey) { return getValueOrNull(valueAndTimestamp); } - private boolean maybeDropRecord(final Record record) { + private boolean maybeDropRecord(final Record record) { // we do join iff the join keys are equal, thus, if {@code keyMapper} returns {@code null} we // cannot join and just ignore the record. Note for KTables, this is the same as having a null key // since keyMapper just returns the key, but for GlobalKTables we can have other keyMappers