Skip to content

[FLINK-37725] Makes Async calcs share correlate split rule with Python #26505

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 15, 2025

Conversation

AlanConfluent
Copy link
Contributor

@AlanConfluent AlanConfluent commented Apr 24, 2025

What is the purpose of the change

Moves PythonCorrelateSplitRule to be common as RemoteCorrelateSplitRule so that we can move Async Scalars out of correlate queries. Today an attempt to use an AsyncCalc in a correlate results in a codegen error:

    @Test
    public void testTableFuncWithAsyncCalc() {
        Table t1 = tEnv.fromValues(1, 2).as("f1");
        tEnv.createTemporaryView("t1", t1);
        tEnv.createTemporarySystemFunction("func", new RandomTableFunction());
        tEnv.createTemporarySystemFunction("addTen", new AsyncFuncAdd10());
        final List<Row> results = executeSql("select * FROM t1, LATERAL TABLE(func(addTen(f1)))");
        final List<Row> expectedRows =
                Arrays.asList(
                        Row.of(1, "blah 11"),
                        Row.of(1, "foo 11"),
                        Row.of(2, "blah 12"),
                        Row.of(2, "foo 12"));
        assertThat(results).containsSequence(expectedRows);
    }

Results in error:

Caused by: org.codehaus.commons.compiler.CompileException: Line 99, Column 15: Unknown variable or type "f"
    at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:13080)
    at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:7230) 

Brief change log

(for example:)

  • The TaskInfo is stored in the blob store on job creation time as a persistent artifact
  • Deployments RPC transmits only the blob storage reference
  • TaskManagers retrieve the TaskInfo from the blob cache

Verifying this change

Please make sure both new and modified tests in this PR follow the conventions for tests defined in our code quality guide.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (100MB)
  • Extended integration test for recovery after master (JobManager) failure
  • Added test that validates that TaskInfo is transferred only once across recoveries
  • Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@flinkbot
Copy link
Collaborator

flinkbot commented Apr 24, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@AlanConfluent AlanConfluent force-pushed the FLINK-37725 branch 2 times, most recently from ec2d2e0 to 8489b71 Compare May 13, 2025 23:37
@dawidwys dawidwys changed the title Makes Async calcs share correlate split rule with Python [FLINK-37725] Makes Async calcs share correlate split rule with Python May 14, 2025
@dawidwys dawidwys requested a review from Copilot May 14, 2025 10:22
Copy link

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR refactors the correlate split rules for async calcs and Python calls by unifying them under a common RemoteCorrelateSplitRule, thereby addressing codegen errors when using AsyncCalc in correlate queries.

  • Migrates Python-specific correlate splitting to a common rule with PythonRemoteCalcCallFinder
  • Updates test cases and rule sets to support asynchronous scalar functions in correlate queries
  • Introduces minor improvements to RemoteCalcCallFinder and AsyncCalcSplitRule implementations

Reviewed Changes

Copilot reviewed 11 out of 11 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/AsyncCorrelateSplitRuleTest.xml Added test cases verifying the correlate plan transformation with async calcs.
flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/AsyncCalcITCase.java Introduced a new integration test for table functions with async calcs.
Other test and rule files Refactored and unified correlate split rules and updated the rule sets accordingly.
PythonCorrelateSplitRule.java Migrated legacy Python rule logic to leverage RemoteCorrelateSplitRule.
RemoteCorrelateSplitRule.java, AsyncCalcSplitRule.java Updated internal rule matching and equality logic to incorporate new RemoteCalcCallFinder implementations.

Comment on lines 101 to 102
return callFinder.isRemoteCall(rexNode) && callFinder.containsNonRemoteCall(rexNode)
|| callFinder.isNonRemoteCall(rexNode) && callFinder.containsRemoteCall(rexNode)
Copy link
Preview

Copilot AI May 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Consider adding parentheses around the complex boolean condition involving callFinder checks to clarify operator precedence and improve readability.

Suggested change
return callFinder.isRemoteCall(rexNode) && callFinder.containsNonRemoteCall(rexNode)
|| callFinder.isNonRemoteCall(rexNode) && callFinder.containsRemoteCall(rexNode)
return (callFinder.isRemoteCall(rexNode) && callFinder.containsNonRemoteCall(rexNode))
|| (callFinder.isNonRemoteCall(rexNode) && callFinder.containsRemoteCall(rexNode))

Copilot uses AI. Check for mistakes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good comment from copilot

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

correlate.getRowType(),
newCorrelate);

call.transformTo(newTopCalc);
Copy link
Preview

Copilot AI May 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider overriding hashCode() in RemoteCorrelateSplitRule to maintain consistency with the custom equals() implementation and ensure correct behavior in hash-based collections.

Copilot uses AI. Check for mistakes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good comment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


RemoteCorrelateSplitRule(RemoteCalcCallFinder callFinder) {
super(
operand(FlinkLogicalCorrelate.class, any()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you're modifying the class, can you please replace the usage of deprecated operand and any?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, converted to the new form of rule with the config.

}

private ScalarFunctionSplitter createScalarFunctionSplitter(
RexProgram program,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IDE tells me it's always null.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the parameter and put null below. I also put a comment about how the scan should not contain any local references that need resolving, in my understanding. I could potentially make the program optional in ScalarFunctionSplitter and assert it exists before use when seeing a local reference, but that seems unnecessary.

}

FlinkLogicalCorrelate newCorrelate;
if (extractedRexNodes.size() > 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (extractedRexNodes.size() > 0) {
if (!extractedRexNodes.isEmpty()) {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


private static final RemoteCalcCallFinder ASYNC_CALL_FINDER = new AsyncRemoteCalcCallFinder();

public static final RelOptRule CORRELATE_SPLIT =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public static final RelOptRule CORRELATE_SPLIT =
public static final RelOptRule INSTANCE =

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines 101 to 102
return callFinder.isRemoteCall(rexNode) && callFinder.containsNonRemoteCall(rexNode)
|| callFinder.isNonRemoteCall(rexNode) && callFinder.containsRemoteCall(rexNode)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good comment from copilot

import org.junit.jupiter.api.Test;

/** Test for {@link AsyncCorrelateSplitRule}. */
public class AsyncCorrelateSplitRuleTest extends TableTestBase {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please fix the compiler warnings.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, done. I believe they are all fixed.

/** Test for {@link AsyncCorrelateSplitRule}. */
public class AsyncCorrelateSplitRuleTest extends TableTestBase {

private TableTestUtil util = streamTestUtil(TableConfig.getDefault());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be final

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


@BeforeEach
public void setup() {
FlinkChainedProgram programs = new FlinkChainedProgram<BatchOptimizeContext>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Raw use of parameterized class 'FlinkChainedProgram'

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

FlinkHepRuleSetProgramBuilder.newBuilder()
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE())
.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
.add(FlinkStreamRuleSets.LOGICAL_REWRITE())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You use a BatchOptimizeContext context with Stream rule set.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I may have copied this part of the test -- my mistake. Changed to StreamOptimizeContext.

@dawidwys dawidwys merged commit 53616d6 into apache:master May 15, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants