Skip to content

Conversation

@nastra
Copy link
Contributor

@nastra nastra commented Nov 18, 2025

This pushes down the LIMIT from Spark to the underlying Scan. This is still expecting the LIMIT to be applied by Spark, but its value is pushed down through the Scan and used as min-rows-requested (introduced by #14565) for server-side scan planning. This is used as a hint during server-side scan planning to not have to return more rows than necessary. It is not required for the server to return that many rows since the scan may not produce that many rows. The server can also return more rows than requested.

@nastra nastra changed the title Spark, Core: Add Limit pushdown to Scan Spark 4.0, Core: Add Limit pushdown to Scan Nov 18, 2025
Comment on lines 297 to 299
public ThisT minRowsRequested(int numRows) {
return newRefinedScan(table, schema, context.minRowsRequested(numRows));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public ThisT minRowsRequested(int numRows) {
return newRefinedScan(table, schema, context.minRowsRequested(numRows));
}
public ThisT minRowsRequested(Integer numRows) {
return newRefinedScan(table, schema, context.minRowsRequested(numRows));
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why would we want to make this an Integer instead of an int?


/**
* Create a new scan that returns files with at least the given number of rows. This is used as a
* hint during server-side scan planning to not have to return more rows than necessary. It is not
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why only server-side scan planning, we can extend this to any scan ?

if the intention is strictly to make it for server-side scan i would recommend another interface which implementation can implement both Scan and LimitAwareScan (?)

Copy link
Contributor

@geruh geruh Nov 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, but also this is a lot lighter than the open PR #13451 that adds some local optimizations for non rest catalogs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've removed that wording to not limit this to server-side scan planning

@nastra nastra closed this Nov 19, 2025
@nastra nastra reopened this Nov 19, 2025
@nastra nastra force-pushed the limit-pushdown-from-spark branch 2 times, most recently from bc87a63 to 501993f Compare November 19, 2025 08:37
Comment on lines +296 to +299
@Override
public ThisT minRowsRequested(long numRows) {
return newRefinedScan(table, schema, context.minRowsRequested(numRows));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How are we gating that this is only being called for RESTCatalog as for other things its a No-op ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we have to gate this in any way, since this is an entirely optional optimization

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sense, can we add a test for metadata table read as it would trigger a different scan obj, wdyt ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would trigger a BaseMetadataTableScan which also extends BaseScan, so this code should cover scans against normal and metadata tables. Let me add a test for this

@nastra nastra force-pushed the limit-pushdown-from-spark branch from 501993f to daa72f5 Compare November 24, 2025 13:37
Comment on lines +158 to +163
assertThat(sql("SELECT * FROM %s LIMIT 1", tableName)).containsExactly(first);
assertThat(sql("SELECT * FROM %s LIMIT 2", tableName)).containsExactly(first, second);
assertThat(sql("SELECT * FROM %s LIMIT 3", tableName)).containsExactly(first, second, third);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is this would succeed even without this change, how about inspecting the plan and extracting the plan node to check the limit if percolated deep down to the scan node

LogicalPlan logicalPlan = limitedDf.queryExecution().optimizedPlan();
Optional<Integer> limit =
        JavaConverters.asJavaCollection(logicalPlan.collectLeaves()).stream()
            .flatMap(
                plan -> {
                  if (!(plan instanceof DataSourceV2ScanRelation)) {
                    return Stream.empty();
                  }

                  DataSourceV2ScanRelation scanRelation = (DataSourceV2ScanRelation) plan;
                  if (!(scanRelation.scan() instanceof SparkBatchQueryScan)) {
                    return Stream.empty();
                  }

                  SparkBatchQueryScan batchQueryScan = (SparkBatchQueryScan) scanRelation.scan();
                  return Stream.ofNullable(batchQueryScan.pushedLimit());
                })
            .findFirst();

    return limit.orElse(null);

credits: https://github.com/apache/iceberg/pull/10943/files#diff-5de7aa01c6be719c3085c1f0416ed700fb78358103c8b26c6e3ecb9f063c04dbR270

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is this would succeed even without this change

Yes that's absolutely correct. The LIMIT is applied by Spark even without any of the changes introduced in this PR.

how about inspecting the plan and extracting the plan node to check the limit if percolated deep down to the scan node

I'm not sure about this, since this would effectively test that the limit is actually pushed down to the SparkScan, which means we're just making sure that implementing SupportsPushDownLimit has the right effect of pushing the limit. In the context of #10943 this check made sense, since that PR was adding a flag to enable/disable limit pushdown, but here we're always pushing it down and I've been going back and forth on the best way about properly testing this.

Let me think a bit more about what an appropriate way of testing this would be

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added some tests to TestFilteredScan where we simulate the LIMIT pushdown and make sure that this properly gets passed down to the TableScanContext for different types of scans

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with how the new tests are set up; there's a clean separation between testing what actually gets pushed down, which verifies we're building the scans correctly and an expectation based off the result of the pushdown.

Comment on lines +296 to +299
@Override
public ThisT minRowsRequested(long numRows) {
return newRefinedScan(table, schema, context.minRowsRequested(numRows));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sense, can we add a test for metadata table read as it would trigger a different scan obj, wdyt ?

@huaxingao
Copy link
Contributor

I think we can probably combine this PR with #13451. After this PR is merged, we can proceed with #13451 to early-stop planning when the accumulated estimated rows ≥ minRowsRequested.

Copy link
Contributor

@amogh-jahagirdar amogh-jahagirdar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, this looks good to me, just some minor comments thanks @nastra!

Comment on lines +347 to +348
// verify CoW scan
assertThat(builder.buildCopyOnWriteScan())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about making sure it's pushed for distributed planning?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok nice, looks like all these tests are parameterized on planning mode which includes distributed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though for this particular test for metadata tables, buildCopyOnWriteScan/buildMergeOnReadScan wouldn't be relevant right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah distributed and local planning is both already handled by the test parameterization. For the metadata tables I just wanted to make sure that the limit is properly pushed down to the TableScanContext independent of whether buildCopyOnWriteScan/buildMergeOnReadScan actually makes sense on a metadata table or not, since technically these are callable from a user's perspective

Comment on lines +158 to +163
assertThat(sql("SELECT * FROM %s LIMIT 1", tableName)).containsExactly(first);
assertThat(sql("SELECT * FROM %s LIMIT 2", tableName)).containsExactly(first, second);
assertThat(sql("SELECT * FROM %s LIMIT 3", tableName)).containsExactly(first, second, third);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with how the new tests are set up; there's a clean separation between testing what actually gets pushed down, which verifies we're building the scans correctly and an expectation based off the result of the pushdown.


/**
* Create a new scan that returns files with at least the given number of rows. This is used as a
* hint and is entirely optional in order to not have to return more rows than necessary. It is
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor, as it's a code comment I think we could simplify the last 2 sentences in this code comment into 1:

This may return fewer rows if the scan does not contain that many, or it may return more than requested.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

@nastra nastra requested a review from singhpk234 December 2, 2025 08:38
@nastra nastra force-pushed the limit-pushdown-from-spark branch from a81000b to c26d174 Compare December 2, 2025 08:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants