Skip to content

Conversation

rkhachatryan
Copy link
Contributor

@rkhachatryan rkhachatryan commented Aug 11, 2025

Replace List of matched records with Iterator to avoid OOM errors in StreamingJoinOperator and the like.
Currently, PR targets only the sync operator version - see the discussion below.

@flinkbot
Copy link
Collaborator

flinkbot commented Aug 11, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@rkhachatryan
Copy link
Contributor Author

Originally, I fixed the issue in Flink 1.19 by replacing List with Iterator.

While porting the fix to 2.0, I discovered that most of the code was extracted to be re-used in async versions of the operators.

However, without some deeper rework, I don't see how my change can be used in async way; because async iterator requires callbacks to be passed in.

So I'm inclined to have two different versions for sync (iterator) and async (list) for now.

WDYT @xuyangzhong, @pnowojski ?

@rkhachatryan rkhachatryan marked this pull request as ready for review September 17, 2025 12:32
Copy link
Contributor

@pnowojski pnowojski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM assuming green build

@rkhachatryan
Copy link
Contributor Author

@flinkbot run azure

1 similar comment
@rkhachatryan
Copy link
Contributor Author

@flinkbot run azure

@rkhachatryan
Copy link
Contributor Author

This failure seems to be unrelated:

Sep 17 17:50:34 17:50:34.902 [ERROR] Failures: 
Sep 17 17:50:34 17:50:34.902 [ERROR]   ChangelogNormalizeRestoreTest>RestoreTestBase.testRestore:397 
Sep 17 17:50:34 Expecting actual:
Sep 17 17:50:34   ["+I[two, 2, b]",
Sep 17 17:50:34     "+I[one, 1, a]",
Sep 17 17:50:34     "+I[three, 3, c]",
Sep 17 17:50:34     "-U[one, 1, a]",
Sep 17 17:50:34     "+U[one, 1, aa]",
Sep 17 17:50:34     "-U[three, 3, c]",
Sep 17 17:50:34     "+U[three, 3, cc]",
Sep 17 17:50:34     "-D[two, 2, b]",
Sep 17 17:50:34     "+I[four, 4, d]",
Sep 17 17:50:34     "+I[five, 5, e]",
Sep 17 17:50:34     "-U[four, 4, d]",
Sep 17 17:50:34     "+U[four, 4, dd]"]
Sep 17 17:50:34 to contain exactly in any order:
Sep 17 17:50:34   ["+I[one, 1, a]",
Sep 17 17:50:34     "+I[two, 2, b]",
Sep 17 17:50:34     "-U[one, 1, a]",
Sep 17 17:50:34     "+U[one, 1, aa]",
Sep 17 17:50:34     "+I[three, 3, c]",
Sep 17 17:50:34     "-D[two, 2, b]",
Sep 17 17:50:34     "-U[three, 3, c]",
Sep 17 17:50:34     "+U[three, 3, cc]",
Sep 17 17:50:34     "+I[four, 4, d]",
Sep 17 17:50:34     "+I[five, 5, e]",
Sep 17 17:50:34     "-U[four, 4, d]",
Sep 17 17:50:34     "+U[four, 4, dd]",
Sep 17 17:50:34     "+I[six, 6, f]",
Sep 17 17:50:34     "-D[six, 6, f]"]
Sep 17 17:50:34 but could not find the following elements:
Sep 17 17:50:34   ["+I[six, 6, f]", "-D[six, 6, f]"]

Re-running the CI.

@rkhachatryan
Copy link
Contributor Author

The failure is unrelated: https://issues.apache.org/jira/browse/FLINK-35012 .
Merging

@rkhachatryan rkhachatryan merged commit d08abb7 into apache:master Sep 18, 2025
@rkhachatryan
Copy link
Contributor Author

Originally, I fixed the issue in Flink 1.19 by replacing List with Iterator.
While porting the fix to 2.0, I discovered that most of the code was extracted to be re-used in async versions of the operators.
However, without some deeper rework, I don't see how my change can be used in async way; because async iterator requires callbacks to be passed in.
So I'm inclined to have two different versions for sync (iterator) and async (list) for now.
WDYT @xuyangzhong, @pnowojski ?

We've discussed this offline with @pnowojski and @Zakelly and decided to have separate versions of the code for now
but create a ticket for the async version: https://issues.apache.org/jira/browse/FLINK-38376

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants