Skip to content

Commit

Permalink
generics
Browse files Browse the repository at this point in the history
  • Loading branch information
mjsax committed Jan 28, 2025
1 parent 72c955e commit bfc0311
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,18 @@
import java.util.Optional;
import java.util.Set;

import static java.util.Collections.singleton;
class KStreamKTableJoin<KStream, VStream, VTable, VOut> implements ProcessorSupplier<KStream, VStream, KStream, VOut> {

class KStreamKTableJoin<K, V, VTable, VOut> implements ProcessorSupplier<K, V, K, VOut> {

private final KeyValueMapper<K, V, K> keyValueMapper = (key, value) -> key;
private final KTableValueGetterSupplier<K, VTable> valueGetterSupplier;
private final ValueJoinerWithKey<? super K, ? super V, ? super VTable, VOut> joiner;
private final KeyValueMapper<KStream, VStream, KStream> keyValueMapper = (key, value) -> key;
private final KTableValueGetterSupplier<KStream, VTable> valueGetterSupplier;
private final ValueJoinerWithKey<? super KStream, ? super VStream, ? super VTable, VOut> joiner;
private final boolean leftJoin;
private final Optional<Duration> gracePeriod;
private final Optional<String> storeName;
private final Set<StoreBuilder<?>> stores;

KStreamKTableJoin(final KTableValueGetterSupplier<K, VTable> valueGetterSupplier,
final ValueJoinerWithKey<? super K, ? super V, ? super VTable, VOut> joiner,
KStreamKTableJoin(final KTableValueGetterSupplier<KStream, VTable> valueGetterSupplier,
final ValueJoinerWithKey<? super KStream, ? super VStream, ? super VTable, VOut> joiner,
final boolean leftJoin,
final Optional<Duration> gracePeriod,
final Optional<StoreBuilder<?>> bufferStoreBuilder) {
Expand All @@ -59,7 +57,7 @@ public Set<StoreBuilder<?>> stores() {
}

@Override
public Processor<K, V, K, VOut> get() {
public Processor<KStream, VStream, KStream, VOut> get() {
return new KStreamKTableJoinProcessor<>(valueGetterSupplier.get(), keyValueMapper, joiner, leftJoin, gracePeriod, storeName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,24 +42,24 @@
import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;

class KStreamKTableJoinProcessor<K, V, KTable, VTable, VOut> extends ContextualProcessor<K, V, K, VOut> {
class KStreamKTableJoinProcessor<KStream, VStream, KTable, VTable, VOut> extends ContextualProcessor<KStream, VStream, KStream, VOut> {
private static final Logger LOG = LoggerFactory.getLogger(KStreamKTableJoin.class);

private final KTableValueGetter<KTable, VTable> valueGetter;
private final KeyValueMapper<? super K, ? super V, ? extends KTable> keyMapper;
private final ValueJoinerWithKey<? super K, ? super V, ? super VTable, ? extends VOut> joiner;
private final KeyValueMapper<? super KStream, ? super VStream, ? extends KTable> keyMapper;
private final ValueJoinerWithKey<? super KStream, ? super VStream, ? super VTable, ? extends VOut> joiner;
private final boolean leftJoin;
private Sensor droppedRecordsSensor;
private final Optional<Duration> gracePeriod;
private TimeOrderedKeyValueBuffer<K, V, V> buffer;
private TimeOrderedKeyValueBuffer<KStream, VStream, VStream> buffer;
protected long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
private InternalProcessorContext internalProcessorContext;
private final boolean useBuffer;
private final String storeName;

KStreamKTableJoinProcessor(final KTableValueGetter<KTable, VTable> valueGetter,
final KeyValueMapper<? super K, ? super V, ? extends KTable> keyMapper,
final ValueJoinerWithKey<? super K, ? super V, ? super VTable, ? extends VOut> joiner,
final KeyValueMapper<? super KStream, ? super VStream, ? extends KTable> keyMapper,
final ValueJoinerWithKey<? super KStream, ? super VStream, ? super VTable, ? extends VOut> joiner,
final boolean leftJoin,
final Optional<Duration> gracePeriod,
final Optional<String> storeName) {
Expand All @@ -73,7 +73,7 @@ class KStreamKTableJoinProcessor<K, V, KTable, VTable, VOut> extends ContextualP
}

@Override
public void init(final ProcessorContext<K, VOut> context) {
public void init(final ProcessorContext<KStream, VOut> context) {
super.init(context);
final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics();
droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
Expand All @@ -89,7 +89,7 @@ public void init(final ProcessorContext<K, VOut> context) {
}

@Override
public void process(final Record<K, V> record) {
public void process(final Record<KStream, VStream> record) {
internalProcessorContext = asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext) context());
updateObservedStreamTime(record.timestamp());
if (maybeDropRecord(record)) {
Expand All @@ -107,8 +107,8 @@ public void process(final Record<K, V> record) {
}
}

private void emit(final TimeOrderedKeyValueBuffer.Eviction<K, V> toEmit) {
final Record<K, V> record = new Record<>(toEmit.key(), toEmit.value(), toEmit.recordContext().timestamp())
private void emit(final TimeOrderedKeyValueBuffer.Eviction<KStream, VStream> toEmit) {
final Record<KStream, VStream> record = new Record<>(toEmit.key(), toEmit.value(), toEmit.recordContext().timestamp())
.withHeaders(toEmit.recordContext().headers());
final ProcessorRecordContext prevRecordContext = internalProcessorContext.recordContext();
try {
Expand All @@ -124,23 +124,23 @@ protected void updateObservedStreamTime(final long timestamp) {
}

@SuppressWarnings("unchecked")
private void doJoin(final Record<K, V> record) {
private void doJoin(final Record<KStream, VStream> record) {
final KTable mappedKey = keyMapper.apply(record.key(), record.value());
final VTable value2 = getValue2(record, mappedKey);
if (leftJoin || value2 != null) {
internalProcessorContext.forward(record.withValue(joiner.apply(record.key(), record.value(), value2)));
}
}

private VTable getValue2(final Record<K, V> record, final KTable mappedKey) {
private VTable getValue2(final Record<KStream, VStream> record, final KTable mappedKey) {
if (mappedKey == null) return null;
final ValueAndTimestamp<VTable> valueAndTimestamp = valueGetter.isVersioned()
? valueGetter.get(mappedKey, record.timestamp())
: valueGetter.get(mappedKey);
return getValueOrNull(valueAndTimestamp);
}

private boolean maybeDropRecord(final Record<K, V> record) {
private boolean maybeDropRecord(final Record<KStream, VStream> record) {
// we do join iff the join keys are equal, thus, if {@code keyMapper} returns {@code null} we
// cannot join and just ignore the record. Note for KTables, this is the same as having a null key
// since keyMapper just returns the key, but for GlobalKTables we can have other keyMappers
Expand Down

0 comments on commit bfc0311

Please sign in to comment.