Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18644: improve generic type names for KStreamImpl and KTableImpl #18722

Open
wants to merge 4 commits into
base: trunk
Choose a base branch
from

Conversation

mjsax
Copy link
Member

@mjsax mjsax commented Jan 28, 2025

No description provided.

@mjsax mjsax added the streams label Jan 28, 2025
@@ -3062,7 +3062,7 @@ <GK, GV, RV> KStream<K, RV> leftJoin(final GlobalKTable<GK, GV> globalTable,
* @see #map(KeyValueMapper)
*/
<KOut, VOut> KStream<KOut, VOut> process(
final ProcessorSupplier<? super K, ? super V, KOut, VOut> processorSupplier,
final ProcessorSupplier<? super K, ? super V, ? extends KOut, ? extends VOut> processorSupplier,
Copy link
Member Author

@mjsax mjsax Jan 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While this is technically public API, to me it's a bug-fix, so would not want to do a KIP for it... Similar to KTable interface changes. -- On the other hand, it more than a one-liner... Overall not sure...

We did a KIP back in the days: https://cwiki.apache.org/confluence/display/KAFKA/KIP-100+-+Relax+Type+constraints+in+Kafka+Streams+API

Note, that some interfaces actually do use ? super/extend X even if the original KIP adding the methods does not say so... So we did diverge from it effectively already.

Thoughts?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think given that we have current usage of ? super/extend X in the code base, we don't need a KIP.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this should be covered by KIP-100 in spirit, I don't think you need a KIP.

@mjsax mjsax force-pushed the kafka-18644-dls-type-safely-kstream-ktable branch from 7abb52a to 153e0b4 Compare January 28, 2025 03:35
Copy link
Member

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @mjsax , overall looks good to me - just a couple of questions

@@ -3062,7 +3062,7 @@ <GK, GV, RV> KStream<K, RV> leftJoin(final GlobalKTable<GK, GV> globalTable,
* @see #map(KeyValueMapper)
*/
<KOut, VOut> KStream<KOut, VOut> process(
final ProcessorSupplier<? super K, ? super V, KOut, VOut> processorSupplier,
final ProcessorSupplier<? super K, ? super V, ? extends KOut, ? extends VOut> processorSupplier,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think given that we have current usage of ? super/extend X in the code base, we don't need a KIP.

Objects.requireNonNull(valueMapper, "valueMapper can't be null");
return (readOnlyKey, value) -> valueMapper.apply(value);
}

static <K, V1, V2, VR> ValueJoinerWithKey<K, V1, V2, VR> toValueJoinerWithKey(final ValueJoiner<V1, V2, VR> valueJoiner) {
static <K, VLeft, VRight, VOut> ValueJoinerWithKey<K, VLeft, VRight, VOut> toValueJoinerWithKey(final ValueJoiner<VLeft, VRight, VOut> valueJoiner) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the other methods have generics of ? super or ? extends why not here - asking more more my own education.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need it from my understanding (please correct me if I am wrong).

Assume, we have KStreams<K, A> and KStream<K, B> that we join, to result KStream<K, C>. (All K,A,B,C are concrete types)

We use ValueJoiner<KFoo, V, W, X> (which are concrete types) with K extend KFoo, A extends V, B extends W and (reverser for output), X extends C, we just want to translate the ValueJoiner to <KFoo, W, V, X> which are still concrete types. And yes, these concreate types must match ? extends... and ? super for the KStreams we process, but they still do.

Thus, we don't need this helper to be able to translate a child/super type of ValueJoiner<KFoo, V, W, X> -- it's sufficient to just translate the concrete types <KFoo, V, W, X>.

Or do I get this wrong?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, that sounds good to me, thanks for the clarification.

@@ -153,9 +153,9 @@ public KStream<K, V> filter(final Predicate<? super K, ? super V> predicate,
Objects.requireNonNull(named, "named can't be null");

final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, FILTER_NAME);
final ProcessorParameters<? super K, ? super V, ?, ?> processorParameters =
final ProcessorParameters<K, V, K, V> processorParameters =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the changes to the generic types? For example, from ? super K to K

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need ? super K only for the user-implemented interfaces / UDFs. This allows to process a KStream of type <K,V>, with a UDF which accepts a supertype of K and or V (or returns child type, eg, ? extend VOut)

However, it does not change the type of the input KStream itself -- and also not the type of the result KStream... The actual/concrete type is still <K,V>.

Assume we filter a KStream<Void,Cat> and apply an Predicate<Void,Animal> which gives us animals with black fur. The result KStream is still <Void,Cat>...

If we would map KStream<Void,Something> using ValueMapper<Cats>, we can still treat the result as KStream<Void,Animal>.

Thus, for the internal ProcessParameter we only need to track the actual/concrete types, which are independent of potential super/child type for UDF generics.

The code compiles both ways for ProcessorParameter, but it's just "silly" to use anything except the concrete types, as the compiler cannot check anything else anyway....

@@ -248,9 +248,9 @@ public <KR, VR> KStream<KR, VR> map(final KeyValueMapper<? super K, ? super V, ?
Objects.requireNonNull(named, "named can't be null");

final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, MAP_NAME);
final ProcessorParameters<? super K, ? super V, ?, ?> processorParameters =
final ProcessorParameters<K, V, KR, VR> processorParameters =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And here from ? to KR

Copy link
Member Author

@mjsax mjsax Jan 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For map() we know that the output types are KR and VR -- so why should we leave them unchecked as ? -- we know the types and the compiler can verify them for us.

I believe, we did use ? to work with the old PAPI which did not have output types on Processor interface, but know we can improve type safety by enforcing the check now, as we have the new api.Processor interface in place.

final BaseRepartitionNodeBuilder<K1, V1, RN> baseRepartitionNodeBuilder) {

static <KStream, VStream, RepartitionNode extends BaseRepartitionNode<KStream, VStream>> String createRepartitionedSource(
final InternalStreamsBuilder builder,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why KStream and VStream? Are there two streams involved here? It's been a while since I've looked at this part of the code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh. I see the confusing. My bad. Not a good name... I just meant Key type of a Stream... K1 is just terrible...

Re-using <K,V> even if possible, would imply shadowing the KStream<K,V> generics... (what might be ok, as we don't need access to them, but it's not recommended in general, but we should use a different name.

Any ideas for a good name?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updating to Key and Value for now. Happy to pick something else if you have a good idea.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good to me, thanks for clearing that up - and yeah naming is hard.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Cf #18721 (comment) -- Same question... Any ideas?

We cannot just use Key, and Value there... 🤔

final ValueJoinerWithKey<? super K, ? super V, ? super VG, ? extends VR> joiner,
final boolean leftJoin,
final Named named) {
private <KGlobalTable, VGlobalTable, VOut> KStream<K, VOut> globalTableJoin(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit would GlobalTableK be more descriptive than KGlobalTable ? same for VGlobalTable

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Guess it's the same question as above -- we might want to use some different name to begin with. Any ideas?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updating to GlobalKey and GlobalValue for now. Happy to pick something else if you have a good idea.

Copy link
Member

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the comment responses. LGTM pending tests passing

@mjsax
Copy link
Member Author

mjsax commented Jan 30, 2025

Java 23 passed.

Java 17:

Found 1 test failures:
FAILED ❌ KafkaAdminClientTest > testAdminClientApisAuthenticationFailure()
Found 5 flaky test failures:
FLAKY ⚠️  KafkaShareConsumerTest > testVerifyFetchAndCloseImplicit()
FLAKY ⚠️  AbstractCoordinatorTest > testWakeupAfterSyncGroupReceivedExternalCompletion()
FLAKY ⚠️  AbstractCoordinatorTest > testWakeupAfterSyncGroupReceived()
FLAKY ⚠️  StickyAssignorTest > testLargeAssignmentAndGroupWithUniformSubscription(boolean).hasConsumerRack = false
FLAKY ⚠️  PlaintextAdminIntegrationTest > testConsumerGroups(String, String).quorum=kraft.groupProtocol=consumer

@mjsax
Copy link
Member Author

mjsax commented Jan 30, 2025

Rebased to resolve conflicts. Will merge after CI finished.

@mjsax mjsax force-pushed the kafka-18644-dls-type-safely-kstream-ktable branch from e83ac66 to f6364b6 Compare January 30, 2025 17:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants