Skip to content

Commit

Permalink
generics
Browse files Browse the repository at this point in the history
  • Loading branch information
mjsax committed Jan 28, 2025
1 parent 3244855 commit 72c955e
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2197,18 +2197,18 @@ <VTable, VOut> KStream<K, VOut> join(final KTable<K, VTable> table,
* <p>You can retrieve all generated internal topic names via {@link Topology#describe()}.
* To customize the name of the changelog topic, use {@link Joined} input parameter.
*/
<VT, VR> KStream<K, VR> join(final KTable<K, VT> table,
final ValueJoiner<? super V, ? super VT, ? extends VR> joiner,
final Joined<K, V, VT> joined);
<VTable, VOut> KStream<K, VOut> join(final KTable<K, VTable> table,
final ValueJoiner<? super V, ? super VTable, ? extends VOut> joiner,
final Joined<K, V, VTable> joined);

/**
* See {@link #join(KTable, ValueJoiner, Joined)}.
*
* <p>Note that the key is read-only and must not be modified, as this can lead to corrupt partitioning.
*/
<VT, VR> KStream<K, VR> join(final KTable<K, VT> table,
final ValueJoinerWithKey<? super K, ? super V, ? super VT, ? extends VR> joiner,
final Joined<K, V, VT> joined);
<VTable, VOut> KStream<K, VOut> join(final KTable<K, VTable> table,
final ValueJoinerWithKey<? super K, ? super V, ? super VTable, ? extends VOut> joiner,
final Joined<K, V, VTable> joined);

/**
* Join records of this stream with {@link KTable}'s records using non-windowed left equi join with default
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -956,36 +956,36 @@ static <K1, V1, RN extends BaseRepartitionNode<K1, V1>> String createRepartition
}

@Override
public <VO, VR> KStream<K, VR> join(final KTable<K, VO> table,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner) {
public <VTable, VOut> KStream<K, VOut> join(final KTable<K, VTable> table,
final ValueJoiner<? super V, ? super VTable, ? extends VOut> joiner) {
return join(table, toValueJoinerWithKey(joiner));
}

@Override
public <VO, VR> KStream<K, VR> join(final KTable<K, VO> table,
final ValueJoinerWithKey<? super K, ? super V, ? super VO, ? extends VR> joiner) {
public <VTable, VOut> KStream<K, VOut> join(final KTable<K, VTable> table,
final ValueJoinerWithKey<? super K, ? super V, ? super VTable, ? extends VOut> joiner) {
return join(table, joiner, Joined.with(null, null, null));
}

@Override
public <VO, VR> KStream<K, VR> join(final KTable<K, VO> table,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final Joined<K, V, VO> joined) {
public <VTable, VOut> KStream<K, VOut> join(final KTable<K, VTable> table,
final ValueJoiner<? super V, ? super VTable, ? extends VOut> joiner,
final Joined<K, V, VTable> joined) {
Objects.requireNonNull(table, "table can't be null");
Objects.requireNonNull(joiner, "joiner can't be null");
Objects.requireNonNull(joined, "joined can't be null");
return join(table, toValueJoinerWithKey(joiner), joined);
}

@Override
public <VO, VR> KStream<K, VR> join(final KTable<K, VO> table,
final ValueJoinerWithKey<? super K, ? super V, ? super VO, ? extends VR> joiner,
final Joined<K, V, VO> joined) {
public <VTable, VOut> KStream<K, VOut> join(final KTable<K, VTable> table,
final ValueJoinerWithKey<? super K, ? super V, ? super VTable, ? extends VOut> joiner,
final Joined<K, V, VTable> joined) {
Objects.requireNonNull(table, "table can't be null");
Objects.requireNonNull(joiner, "joiner can't be null");
Objects.requireNonNull(joined, "joined can't be null");

final JoinedInternal<K, V, VO> joinedInternal = new JoinedInternal<>(joined);
final JoinedInternal<K, V, VTable> joinedInternal = new JoinedInternal<>(joined);
final String name = joinedInternal.name();

if (repartitionRequired) {
Expand Down Expand Up @@ -1141,15 +1141,15 @@ private <KG, VG, VR> KStream<K, VR> globalTableJoin(final GlobalKTable<KG, VG> g
builder);
}

@SuppressWarnings("unchecked")
private <VO, VR> KStream<K, VR> doStreamTableJoin(final KTable<K, VO> table,
final ValueJoinerWithKey<? super K, ? super V, ? super VO, ? extends VR> joiner,
final JoinedInternal<K, V, VO> joinedInternal,
final boolean leftJoin) {
@SuppressWarnings({"unchecked", "resource"})
private <VTable, VOut> KStream<K, VOut> doStreamTableJoin(final KTable<K, VTable> table,
final ValueJoinerWithKey<? super K, ? super V, ? super VTable, ? extends VOut> joiner,
final JoinedInternal<K, V, VTable> joinedInternal,
final boolean leftJoin) {
Objects.requireNonNull(table, "table can't be null");
Objects.requireNonNull(joiner, "joiner can't be null");

final Set<String> allSourceNodes = ensureCopartitionWith(Collections.singleton((AbstractStream<K, VO>) table));
final Set<String> allSourceNodes = ensureCopartitionWith(Collections.singleton((AbstractStream<K, VTable>) table));

final NamedInternal renamed = new NamedInternal(joinedInternal.name());

Expand All @@ -1158,7 +1158,7 @@ private <VO, VR> KStream<K, VR> doStreamTableJoin(final KTable<K, VO> table,
Optional<StoreBuilder<?>> bufferStoreBuilder = Optional.empty();

if (joinedInternal.gracePeriod() != null) {
if (!((KTableImpl<K, ?, VO>) table).graphNode.isOutputVersioned().orElse(true)) {
if (!((KTableImpl<K, ?, VTable>) table).graphNode.isOutputVersioned().orElse(true)) {
throw new IllegalArgumentException("KTable must be versioned to use a grace period in a stream table join.");
}
final String bufferName = name + "-Buffer";
Expand All @@ -1171,19 +1171,19 @@ private <VO, VR> KStream<K, VR> doStreamTableJoin(final KTable<K, VO> table,
);
}

final ProcessorSupplier<K, V, K, ? extends VR> processorSupplier = new KStreamKTableJoin<>(
((KTableImpl<K, ?, VO>) table).valueGetterSupplier(),
final ProcessorSupplier<K, V, K, ? extends VOut> processorSupplier = new KStreamKTableJoin<>(
((KTableImpl<K, ?, VTable>) table).valueGetterSupplier(),
joiner,
leftJoin,
Optional.ofNullable(joinedInternal.gracePeriod()),
bufferStoreBuilder
);

final ProcessorParameters<K, V, ?, ?> processorParameters = new ProcessorParameters<>(processorSupplier, name);
final ProcessorParameters<K, V, K, ? extends VOut> processorParameters = new ProcessorParameters<>(processorSupplier, name);
final StreamTableJoinNode<K, V> streamTableJoinNode = new StreamTableJoinNode<>(
name,
processorParameters,
((KTableImpl<K, ?, VO>) table).valueGetterSupplier().storeNames(),
((KTableImpl<K, ?, VTable>) table).valueGetterSupplier().storeNames(),
this.name,
joinedInternal.gracePeriod()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,24 @@
import org.apache.kafka.streams.state.StoreBuilder;

import java.time.Duration;
import java.util.Collections;
import java.util.Optional;
import java.util.Set;

import static java.util.Collections.singleton;

class KStreamKTableJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K, VOut> {
class KStreamKTableJoin<K, V, VTable, VOut> implements ProcessorSupplier<K, V, K, VOut> {

private final KeyValueMapper<K, V1, K> keyValueMapper = (key, value) -> key;
private final KTableValueGetterSupplier<K, V2> valueGetterSupplier;
private final ValueJoinerWithKey<? super K, ? super V1, ? super V2, VOut> joiner;
private final KeyValueMapper<K, V, K> keyValueMapper = (key, value) -> key;
private final KTableValueGetterSupplier<K, VTable> valueGetterSupplier;
private final ValueJoinerWithKey<? super K, ? super V, ? super VTable, VOut> joiner;
private final boolean leftJoin;
private final Optional<Duration> gracePeriod;
private final Optional<String> storeName;
private final Set<StoreBuilder<?>> stores;

KStreamKTableJoin(final KTableValueGetterSupplier<K, V2> valueGetterSupplier,
final ValueJoinerWithKey<? super K, ? super V1, ? super V2, VOut> joiner,
KStreamKTableJoin(final KTableValueGetterSupplier<K, VTable> valueGetterSupplier,
final ValueJoinerWithKey<? super K, ? super V, ? super VTable, VOut> joiner,
final boolean leftJoin,
final Optional<Duration> gracePeriod,
final Optional<StoreBuilder<?>> bufferStoreBuilder) {
Expand All @@ -49,11 +50,7 @@ class KStreamKTableJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K,
this.gracePeriod = gracePeriod;
this.storeName = bufferStoreBuilder.map(StoreBuilder::name);

if (bufferStoreBuilder.isEmpty()) {
this.stores = null;
} else {
this.stores = singleton(bufferStoreBuilder.get());
}
this.stores = bufferStoreBuilder.<Set<StoreBuilder<?>>>map(Collections::singleton).orElse(null);
}

@Override
Expand All @@ -62,7 +59,7 @@ public Set<StoreBuilder<?>> stores() {
}

@Override
public Processor<K, V1, K, VOut> get() {
public Processor<K, V, K, VOut> get() {
return new KStreamKTableJoinProcessor<>(valueGetterSupplier.get(), keyValueMapper, joiner, leftJoin, gracePeriod, storeName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,24 +42,24 @@
import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;

class KStreamKTableJoinProcessor<K1, K2, V1, V2, VOut> extends ContextualProcessor<K1, V1, K1, VOut> {
class KStreamKTableJoinProcessor<K, V, KTable, VTable, VOut> extends ContextualProcessor<K, V, K, VOut> {
private static final Logger LOG = LoggerFactory.getLogger(KStreamKTableJoin.class);

private final KTableValueGetter<K2, V2> valueGetter;
private final KeyValueMapper<? super K1, ? super V1, ? extends K2> keyMapper;
private final ValueJoinerWithKey<? super K1, ? super V1, ? super V2, ? extends VOut> joiner;
private final KTableValueGetter<KTable, VTable> valueGetter;
private final KeyValueMapper<? super K, ? super V, ? extends KTable> keyMapper;
private final ValueJoinerWithKey<? super K, ? super V, ? super VTable, ? extends VOut> joiner;
private final boolean leftJoin;
private Sensor droppedRecordsSensor;
private final Optional<Duration> gracePeriod;
private TimeOrderedKeyValueBuffer<K1, V1, V1> buffer;
private TimeOrderedKeyValueBuffer<K, V, V> buffer;
protected long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
private InternalProcessorContext internalProcessorContext;
private final boolean useBuffer;
private final String storeName;

KStreamKTableJoinProcessor(final KTableValueGetter<K2, V2> valueGetter,
final KeyValueMapper<? super K1, ? super V1, ? extends K2> keyMapper,
final ValueJoinerWithKey<? super K1, ? super V1, ? super V2, ? extends VOut> joiner,
KStreamKTableJoinProcessor(final KTableValueGetter<KTable, VTable> valueGetter,
final KeyValueMapper<? super K, ? super V, ? extends KTable> keyMapper,
final ValueJoinerWithKey<? super K, ? super V, ? super VTable, ? extends VOut> joiner,
final boolean leftJoin,
final Optional<Duration> gracePeriod,
final Optional<String> storeName) {
Expand All @@ -73,7 +73,7 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, VOut> extends ContextualProcess
}

@Override
public void init(final ProcessorContext<K1, VOut> context) {
public void init(final ProcessorContext<K, VOut> context) {
super.init(context);
final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics();
droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
Expand All @@ -89,7 +89,7 @@ public void init(final ProcessorContext<K1, VOut> context) {
}

@Override
public void process(final Record<K1, V1> record) {
public void process(final Record<K, V> record) {
internalProcessorContext = asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext) context());
updateObservedStreamTime(record.timestamp());
if (maybeDropRecord(record)) {
Expand All @@ -107,8 +107,8 @@ public void process(final Record<K1, V1> record) {
}
}

private void emit(final TimeOrderedKeyValueBuffer.Eviction<K1, V1> toEmit) {
final Record<K1, V1> record = new Record<>(toEmit.key(), toEmit.value(), toEmit.recordContext().timestamp())
private void emit(final TimeOrderedKeyValueBuffer.Eviction<K, V> toEmit) {
final Record<K, V> record = new Record<>(toEmit.key(), toEmit.value(), toEmit.recordContext().timestamp())
.withHeaders(toEmit.recordContext().headers());
final ProcessorRecordContext prevRecordContext = internalProcessorContext.recordContext();
try {
Expand All @@ -124,23 +124,23 @@ protected void updateObservedStreamTime(final long timestamp) {
}

@SuppressWarnings("unchecked")
private void doJoin(final Record<K1, V1> record) {
final K2 mappedKey = keyMapper.apply(record.key(), record.value());
final V2 value2 = getValue2(record, mappedKey);
private void doJoin(final Record<K, V> record) {
final KTable mappedKey = keyMapper.apply(record.key(), record.value());
final VTable value2 = getValue2(record, mappedKey);
if (leftJoin || value2 != null) {
internalProcessorContext.forward(record.withValue(joiner.apply(record.key(), record.value(), value2)));
}
}

private V2 getValue2(final Record<K1, V1> record, final K2 mappedKey) {
private VTable getValue2(final Record<K, V> record, final KTable mappedKey) {
if (mappedKey == null) return null;
final ValueAndTimestamp<V2> valueAndTimestamp = valueGetter.isVersioned()
final ValueAndTimestamp<VTable> valueAndTimestamp = valueGetter.isVersioned()
? valueGetter.get(mappedKey, record.timestamp())
: valueGetter.get(mappedKey);
return getValueOrNull(valueAndTimestamp);
}

private boolean maybeDropRecord(final Record<K1, V1> record) {
private boolean maybeDropRecord(final Record<K, V> record) {
// we do join iff the join keys are equal, thus, if {@code keyMapper} returns {@code null} we
// cannot join and just ignore the record. Note for KTables, this is the same as having a null key
// since keyMapper just returns the key, but for GlobalKTables we can have other keyMappers
Expand All @@ -149,7 +149,7 @@ private boolean maybeDropRecord(final Record<K1, V1> record) {
// an empty message (ie, there is nothing to be joined) -- this contrast SQL NULL semantics
// furthermore, on left/outer joins 'null' in ValueJoiner#apply() indicates a missing record --
// thus, to be consistent and to avoid ambiguous null semantics, null values are ignored
final K2 mappedKey = keyMapper.apply(record.key(), record.value());
final KTable mappedKey = keyMapper.apply(record.key(), record.value());
if (leftJoin && record.key() == null && record.value() != null) {
return false;
}
Expand Down

0 comments on commit 72c955e

Please sign in to comment.