Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: cleanup KStream JavaDocs (4/N) - stream-table-inner-join #18721

Open
wants to merge 11 commits into
base: trunk
Choose a base branch
from

Conversation

mjsax
Copy link
Member

@mjsax mjsax commented Jan 28, 2025

No description provided.

@mjsax mjsax added the streams label Jan 28, 2025
@@ -42,24 +42,24 @@
import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;

class KStreamKTableJoinProcessor<K1, K2, V1, V2, VOut> extends ContextualProcessor<K1, V1, K1, VOut> {
class KStreamKTableJoinProcessor<KStream, VStream, KTable, VTable, VOut> extends ContextualProcessor<KStream, VStream, KStream, VOut> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flipping type order -- no splash radius -- change limited to this line only, but diff is weird.

Effectively renaming K1 -> KStream, V1 -> VStream, K2 -> KTable, and V2 -> VTable.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bbejeck Mentioned on some other PR, that KStream (and KTable) is a confusing generic type var, as we also have interfaces KStream and KTable.

Might be good to rename this differently, but I am not sure how. Ideas?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KLeft and KRight?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe.

I was thinking about StreamKey/StreamValue/TableKey/TableValue to highlight that it's two different "objects". Left/Right would not express this explicitly.

Copy link
Member

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @mjsax, LGTM. Left some minor comment that you can decide yourself if you want to address them.

* For this case, all data of the stream will be redistributed through the repartitioning topic by writing all
* records to it, and rereading all records from it, such that the join input {@code KStream} is partitioned
* correctly on its key.
* Join records of this stream with {@link KTable}'s records using non-windowed inner equi join.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think usually it's written equi-join not equi join.

* smaller than the defined grace-period allows; these "late records" will be dropped, and not join computation
* is triggered.
* Using a versioned state store for the {@link KTable} also implies that the defined "history retention" provides
* a cut-off point, and "late record" will be dropped, not updating the {@link KTable} state.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* a cut-off point, and "late record" will be dropped, not updating the {@link KTable} state.
* a cut-off point, and late records will be dropped, not updating the {@link KTable} state.

* Note, that using a join grace-period introduces the notion of "late records", i.e., records with a timestamp
* smaller than the defined grace-period allows; these "late records" will be dropped, and not join computation
* is triggered.
* Using a versioned state store for the {@link KTable} also implies that the defined "history retention" provides
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit unclear what you mean by history retention here. It's quoted and not mentioned before, not explained.

* Using a versioned state store for the {@link KTable} also implies that the defined "history retention" provides
* a cut-off point, and "late record" will be dropped, not updating the {@link KTable} state.
* <p>If a join grace-period is specified, the {@code KStream} will be materialized in a local state store.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to me that this paragraph would better be placed before the explanation of versioned state stores, since it's explaining state stores generally?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's this way on purpose. Note, it's about KStream side materialization, and we only materialize the stream, if a grace-period is specific -- but we only allow this, if the KTable is versioned.

For regular/non-versioned KTables, using a grace-period and materializing/buffering the stream records does not make sense -- if there not multiple versions per-key, there is no point in delaying the KTable lookup.

Thoughts?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aah, yes. Sorry, I misread this paragraph, still thinking about the KTable. This makes sense.

@lucasbru
Copy link
Member

Updates look good to me

@mjsax
Copy link
Member Author

mjsax commented Jan 30, 2025

Java 17:

Found 1 test failures:
FAILED ❌ KafkaAdminClientTest > testAdminClientApisAuthenticationFailure()
Found 3 flaky test failures:
FLAKY ⚠️  EagerConsumerCoordinatorTest > testOutdatedCoordinatorAssignment()
FLAKY ⚠️  DefaultStateUpdaterTest > shouldRestoreActiveStatefulTaskThenUpdateStandbyTaskAndAgainRestoreActiveStatefulTask()
FLAKY ⚠️  DeleteSegmentsByRetentionTimeTest > executeTieredStorageTest(String, String).quorum=kraft.groupProtocol=consumer

Java 23:

Found 1 test failures:
FAILED ❌ KafkaAdminClientTest > testAdminClientApisAuthenticationFailure()
Found 4 flaky test failures:
FLAKY ⚠️  TransactionsTest > "testBumpTransactionalEpochWithTV2Disabled(String, String, boolean).quorum=kraft, groupProtocol=classic, isTV2Enabled=false"
FLAKY ⚠️  TransactionsTest > "testBumpTransactionalEpochWithTV2Disabled(String, String, boolean).quorum=kraft, groupProtocol=consumer, isTV2Enabled=false"
FLAKY ⚠️  PlaintextAdminIntegrationTest > testConsumerGroups(String, String).quorum=kraft.groupProtocol=consumer
FLAKY ⚠️  PlaintextAdminIntegrationTest > testConsumerGroups(String, String).quorum=kraft.groupProtocol=classic

Fix for KakfaAdminClientTest is already on the way: #18735

mjsax and others added 2 commits January 29, 2025 19:19
@mjsax mjsax added the docs label Jan 30, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants