Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18644: improve generic type names for KStreamImpl and KTableImpl #18722

Merged
merged 4 commits into from
Jan 31, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3044,7 +3044,7 @@ <GK, GV, RV> KStream<K, RV> leftJoin(final GlobalKTable<GK, GV> globalTable,
* @see #map(KeyValueMapper)
*/
<KOut, VOut> KStream<KOut, VOut> process(
final ProcessorSupplier<? super K, ? super V, KOut, VOut> processorSupplier,
final ProcessorSupplier<? super K, ? super V, ? extends KOut, ? extends VOut> processorSupplier,
Copy link
Member Author

@mjsax mjsax Jan 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While this is technically public API, to me it's a bug-fix, so would not want to do a KIP for it... Similar to KTable interface changes. -- On the other hand, it more than a one-liner... Overall not sure...

We did a KIP back in the days: https://cwiki.apache.org/confluence/display/KAFKA/KIP-100+-+Relax+Type+constraints+in+Kafka+Streams+API

Note, that some interfaces actually do use ? super/extend X even if the original KIP adding the methods does not say so... So we did diverge from it effectively already.

Thoughts?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think given that we have current usage of ? super/extend X in the code base, we don't need a KIP.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this should be covered by KIP-100 in spirit, I don't think you need a KIP.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mjsax sorry I just saw this from the community slack --- I read the JIRA ticket and the PR itself, and this looks great to me (we should have done it a while ago, thanks for completing it). As for #18721 (comment) I gave some thought on it, and KLeft/KRight/VLeft/VRight sounds reasonable as well.

final String... stateStoreNames
);

Expand Down Expand Up @@ -3144,7 +3144,7 @@ <KOut, VOut> KStream<KOut, VOut> process(
* @see #processValues(FixedKeyProcessorSupplier, Named, String...)
*/
<KOut, VOut> KStream<KOut, VOut> process(
final ProcessorSupplier<? super K, ? super V, KOut, VOut> processorSupplier,
final ProcessorSupplier<? super K, ? super V, ? extends KOut, ? extends VOut> processorSupplier,
final Named named,
final String... stateStoreNames
);
Expand Down Expand Up @@ -3244,7 +3244,7 @@ <KOut, VOut> KStream<KOut, VOut> process(
* @see #process(ProcessorSupplier, Named, String...)
*/
<VOut> KStream<K, VOut> processValues(
final FixedKeyProcessorSupplier<? super K, ? super V, VOut> processorSupplier,
final FixedKeyProcessorSupplier<? super K, ? super V, ? extends VOut> processorSupplier,
final String... stateStoreNames
);

Expand Down Expand Up @@ -3344,7 +3344,7 @@ <VOut> KStream<K, VOut> processValues(
* @see #process(ProcessorSupplier, Named, String...)
*/
<VOut> KStream<K, VOut> processValues(
final FixedKeyProcessorSupplier<? super K, ? super V, VOut> processorSupplier,
final FixedKeyProcessorSupplier<? super K, ? super V, ? extends VOut> processorSupplier,
final Named named,
final String... stateStoreNames
);
Expand Down
74 changes: 38 additions & 36 deletions streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ public interface KTable<K, V> {
* @return a {@code KTable} that contains only those records that satisfy the given predicate
* @see #filterNot(Predicate)
*/
KTable<K, V> filter(final Predicate<? super K, ? super V> predicate, final Named named);
KTable<K, V> filter(final Predicate<? super K, ? super V> predicate,
final Named named);

/**
* Create a new {@code KTable} that consists of all records of this {@code KTable} which satisfy the given
Expand Down Expand Up @@ -241,7 +242,8 @@ KTable<K, V> filter(final Predicate<? super K, ? super V> predicate,
* @return a {@code KTable} that contains only those records that do <em>not</em> satisfy the given predicate
* @see #filter(Predicate)
*/
KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate, final Named named);
KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate,
final Named named);

/**
* Create a new {@code KTable} that consists all records of this {@code KTable} which do <em>not</em> satisfy the
Expand Down Expand Up @@ -1069,7 +1071,7 @@ <VR> KTable<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super
* @param <VR> the value type of the result {@link KGroupedTable}
* @return a {@link KGroupedTable} that contains the re-grouped records of the original {@code KTable}
*/
<KR, VR> KGroupedTable<KR, VR> groupBy(final KeyValueMapper<? super K, ? super V, KeyValue<KR, VR>> selector);
<KR, VR> KGroupedTable<KR, VR> groupBy(final KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> selector);

/**
* Re-groups the records of this {@code KTable} using the provided {@link KeyValueMapper}
Expand Down Expand Up @@ -1101,7 +1103,7 @@ <VR> KTable<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super
* @param <VR> the value type of the result {@link KGroupedTable}
* @return a {@link KGroupedTable} that contains the re-grouped records of the original {@code KTable}
*/
<KR, VR> KGroupedTable<KR, VR> groupBy(final KeyValueMapper<? super K, ? super V, KeyValue<KR, VR>> selector,
<KR, VR> KGroupedTable<KR, VR> groupBy(final KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> selector,
final Grouped<KR, VR> grouped);

/**
Expand Down Expand Up @@ -2109,8 +2111,8 @@ <VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
* @return a {@code KTable} that contains the result of joining this table with {@code other}
*/
<VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
final Function<V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner);
final Function<? super V, ? extends KO> foreignKeyExtractor,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner);

/**
* Join records of this {@code KTable} with another {@code KTable} using non-windowed inner join.
Expand All @@ -2127,8 +2129,8 @@ <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
* @return a {@code KTable} that contains the result of joining this table with {@code other}
*/
<VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
final BiFunction<K, V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner);
final BiFunction<? super K, ? super V, ? extends KO> foreignKeyExtractor,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner);

/**
* Join records of this {@code KTable} with another {@code KTable} using non-windowed inner join,
Expand All @@ -2149,8 +2151,8 @@ <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
* @return a {@code KTable} that contains the result of joining this table with {@code other}
*/
<VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
final Function<V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
final Function<? super V, ? extends KO> foreignKeyExtractor,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final TableJoined<K, KO> tableJoined);

/**
Expand All @@ -2172,8 +2174,8 @@ <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
* @return a {@code KTable} that contains the result of joining this table with {@code other}
*/
<VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
final BiFunction<K, V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
final BiFunction<? super K, ? super V, ? extends KO> foreignKeyExtractor,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final TableJoined<K, KO> tableJoined);
/**
* Join records of this {@code KTable} with another {@code KTable} using non-windowed inner join.
Expand All @@ -2192,8 +2194,8 @@ <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
* @return a {@code KTable} that contains the result of joining this table with {@code other}
*/
<VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
final Function<V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
final Function<? super V, ? extends KO> foreignKeyExtractor,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);

/**
Expand All @@ -2213,8 +2215,8 @@ <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
* @return a {@code KTable} that contains the result of joining this table with {@code other}
*/
<VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
final BiFunction<K, V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
final BiFunction<? super K, ? super V, ? extends KO> foreignKeyExtractor,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);

/**
Expand All @@ -2238,8 +2240,8 @@ <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
* @return a {@code KTable} that contains the result of joining this table with {@code other}
*/
<VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
final Function<V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
final Function<? super V, ? extends KO> foreignKeyExtractor,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final TableJoined<K, KO> tableJoined,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);

Expand All @@ -2264,8 +2266,8 @@ <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
* @return a {@code KTable} that contains the result of joining this table with {@code other}
*/
<VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
final BiFunction<K, V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
final BiFunction<? super K, ? super V, ? extends KO> foreignKeyExtractor,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final TableJoined<K, KO> tableJoined,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);

Expand All @@ -2284,8 +2286,8 @@ <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
* @return a {@code KTable} that contains only those records that satisfy the given predicate
*/
<VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
final Function<V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner);
final Function<? super V, ? extends KO> foreignKeyExtractor,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner);

/**
* Join records of this {@code KTable} with another {@code KTable} using non-windowed left join.
Expand All @@ -2302,8 +2304,8 @@ <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
* @return a {@code KTable} that contains only those records that satisfy the given predicate
*/
<VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
final BiFunction<K, V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner);
final BiFunction<? super K, ? super V, ? extends KO> foreignKeyExtractor,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner);

/**
* Join records of this {@code KTable} with another {@code KTable} using non-windowed left join,
Expand All @@ -2323,8 +2325,8 @@ <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
* @return a {@code KTable} that contains the result of joining this table with {@code other}
*/
<VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
final Function<V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
final Function<? super V, ? extends KO> foreignKeyExtractor,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final TableJoined<K, KO> tableJoined);

/**
Expand All @@ -2345,8 +2347,8 @@ <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
* @return a {@code KTable} that contains the result of joining this table with {@code other}
*/
<VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
final BiFunction<K, V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
final BiFunction<? super K, ? super V, ? extends KO> foreignKeyExtractor,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final TableJoined<K, KO> tableJoined);

/**
Expand All @@ -2366,8 +2368,8 @@ <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
* @return a {@code KTable} that contains the result of joining this table with {@code other}
*/
<VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
final Function<V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
final Function<? super V, ? extends KO> foreignKeyExtractor,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);

/**
Expand All @@ -2387,8 +2389,8 @@ <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
* @return a {@code KTable} that contains the result of joining this table with {@code other}
*/
<VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
final BiFunction<K, V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
final BiFunction<? super K, ? super V, ? extends KO> foreignKeyExtractor,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);

/**
Expand All @@ -2412,8 +2414,8 @@ <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
* @return a {@code KTable} that contains the result of joining this table with {@code other}
*/
<VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
final Function<V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
final Function<? super V, ? extends KO> foreignKeyExtractor,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final TableJoined<K, KO> tableJoined,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);

Expand All @@ -2438,8 +2440,8 @@ <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
* @return a {@code KTable} that contains the result of joining this table with {@code other}
*/
<VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
final BiFunction<K, V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
final BiFunction<? super K, ? super V, ? extends KO> foreignKeyExtractor,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final TableJoined<K, KO> tableJoined,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,20 +91,20 @@ Set<String> ensureCopartitionWith(final Collection<? extends AbstractStream<K, ?
return allSourceNodes;
}

static <T2, T1, R> ValueJoiner<T2, T1, R> reverseJoiner(final ValueJoiner<T1, T2, R> joiner) {
static <VRight, VLeft, VOut> ValueJoiner<VRight, VLeft, VOut> reverseJoiner(final ValueJoiner<VLeft, VRight, VOut> joiner) {
return (value2, value1) -> joiner.apply(value1, value2);
}

static <K, T2, T1, R> ValueJoinerWithKey<K, T2, T1, R> reverseJoinerWithKey(final ValueJoinerWithKey<K, T1, T2, R> joiner) {
static <K, VRight, VLeft, VOut> ValueJoinerWithKey<K, VRight, VLeft, VOut> reverseJoinerWithKey(final ValueJoinerWithKey<K, VLeft, VRight, VOut> joiner) {
return (key, value2, value1) -> joiner.apply(key, value1, value2);
}

static <K, V, VR> ValueMapperWithKey<K, V, VR> withKey(final ValueMapper<V, VR> valueMapper) {
static <K, V, VOut> ValueMapperWithKey<K, V, VOut> withKey(final ValueMapper<V, VOut> valueMapper) {
Objects.requireNonNull(valueMapper, "valueMapper can't be null");
return (readOnlyKey, value) -> valueMapper.apply(value);
}

static <K, V1, V2, VR> ValueJoinerWithKey<K, V1, V2, VR> toValueJoinerWithKey(final ValueJoiner<V1, V2, VR> valueJoiner) {
static <K, VLeft, VRight, VOut> ValueJoinerWithKey<K, VLeft, VRight, VOut> toValueJoinerWithKey(final ValueJoiner<VLeft, VRight, VOut> valueJoiner) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the other methods have generics of ? super or ? extends why not here - asking more more my own education.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need it from my understanding (please correct me if I am wrong).

Assume, we have KStreams<K, A> and KStream<K, B> that we join, to result KStream<K, C>. (All K,A,B,C are concrete types)

We use ValueJoiner<KFoo, V, W, X> (which are concrete types) with K extend KFoo, A extends V, B extends W and (reverser for output), X extends C, we just want to translate the ValueJoiner to <KFoo, W, V, X> which are still concrete types. And yes, these concreate types must match ? extends... and ? super for the KStreams we process, but they still do.

Thus, we don't need this helper to be able to translate a child/super type of ValueJoiner<KFoo, V, W, X> -- it's sufficient to just translate the concrete types <KFoo, V, W, X>.

Or do I get this wrong?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, that sounds good to me, thanks for the clarification.

Objects.requireNonNull(valueJoiner, "joiner can't be null");
return (readOnlyKey, value1, value2) -> valueJoiner.apply(value1, value2);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@

public class ForeachProcessor<K, V> implements Processor<K, V, Void, Void> {

private final ForeachAction<K, V> action;
private final ForeachAction<? super K, ? super V> action;

public ForeachProcessor(final ForeachAction<K, V> action) {
public ForeachProcessor(final ForeachAction<? super K, ? super V> action) {
this.action = action;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,12 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K, V> implements KGr
this.userProvidedRepartitionTopicName = groupedInternal.name();
}

private <VAgg> KTable<K, VAgg> doAggregate(final ProcessorSupplier<K, Change<V>, K, Change<VAgg>> aggregateSupplier,
final NamedInternal named,
final String functionName,
final MaterializedInternal<K, VAgg, KeyValueStore<Bytes, byte[]>> materialized) {

private <VAgg> KTable<K, VAgg> doAggregate(
final ProcessorSupplier<K, Change<V>, K, Change<VAgg>> aggregateSupplier,
final NamedInternal named,
final String functionName,
final MaterializedInternal<K, VAgg, KeyValueStore<Bytes, byte[]>> materialized
) {
final String sinkName = named.suffixWithOrElseGet("-sink", builder, KStreamImpl.SINK_NAME);
final String sourceName = named.suffixWithOrElseGet("-source", builder, KStreamImpl.SOURCE_NAME);
final String funcName = named.orElseGenerateWithPrefix(builder, functionName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@

class KStreamFilter<K, V> implements FixedKeyProcessorSupplier<K, V, V> {

private final Predicate<K, V> predicate;
private final Predicate<? super K, ? super V> predicate;
private final boolean filterNot;

public KStreamFilter(final Predicate<K, V> predicate, final boolean filterNot) {
public KStreamFilter(final Predicate<? super K, ? super V> predicate, final boolean filterNot) {
this.predicate = predicate;
this.filterNot = filterNot;
}
Expand Down
Loading