Skip to content

[SPARK-51271][PYTHON] Add filter pushdown API to Python Data Sources #49961

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from

Conversation

wengh
Copy link
Contributor

@wengh wengh commented Feb 14, 2025

What changes were proposed in this pull request?

This PR adds support for filter pushdown to Python Data Source batch read, with a API similar to SupportsPushDownFilters interface. The user can implement DataSourceReader.pushFilters to receive filters that may be pushed down, decide which filters to push down, remember them, and return the remaining filters to be applied by Spark.

Note that filter pushdown is only supported for batch read, not for streaming read. This is also the case for the Scala API. Therefore the new API is added to DataSourceReader and not to DataSource or DataSourceStreamReader.

To keep the Python API simple, we will only support V1 filters that have a column, a boolean operator, and a literal value. The filter serialization is a placeholder and will be implemented in a future PR.

class DataSourceReader(ABC):
    ...

    def pushFilters(self, filters: List["Filter"]) -> Iterable["Filter"]:
        """
        Called with the list of filters that can be pushed down to the data source.

        The list of filters should be interpreted as the AND of the elements.

        Filter pushdown allows the data source to handle a subset of filters. This
        can improve performance by reducing the amount of data that needs to be
        processed by Spark.

        This method is called once during query planning. By default, it returns
        all filters, indicating that no filters can be pushed down. Subclasses can
        override this method to implement filter pushdown.

        It's recommended to implement this method only for data sources that natively
        support filtering, such as databases and GraphQL APIs.

        .. versionadded: 4.1.0

        Parameters
        ----------
        filters : list of :class:`Filter`\\s

        Returns
        -------
        iterable of :class:`Filter`\\s
            Filters that still need to be evaluated by Spark post the data source
            scan. This includes unsupported filters and partially pushed filters.
            Every returned filter must be one of the input filters by reference.

        Side effects
        ------------
        This method is allowed to modify `self`. The object must remain picklable.
        Modifications to `self` are visible to the `partitions()` and `read()` methods.

        Examples
        --------
        Example filters and the resulting arguments passed to pushFilters:

        +-------------------------------+---------------------------------------------+
        | Filters                       | Pushdown Arguments                          |
        +-------------------------------+---------------------------------------------+
        | `a = 1 and b = 2`             | `[EqualTo(("a",), 1), EqualTo(("b",), 2)]`  |
        | `a = 1 or b = 2`              | `[]`                                        |
        | `a = 1 or (b = 2 and c = 3)`  | `[]`                                        |
        | `a = 1 and (b = 2 or c = 3)`  | `[EqualTo(("a",), 1)]`                      |
        +-------------------------------+---------------------------------------------+

        Implement pushFilters to support EqualTo filters only:

        >>> def pushFilters(self, filters):
        ...     for filter in filters:
        ...         if isinstance(filter, EqualTo):
        ...             # Save supported filter for handling in partitions() and read()
        ...             self.filters.append(filter)
        ...         else:
        ...             # Unsupported filter
        ...             yield filter
        """
        return filters

Roadmap

  • (this PR) Add filter pushdown API
  • Implement filter serialization and more filter types
  • Add column pruning API

Suggested reivew order (from high level to details)

  1. datasource.py: add filter pushdown to Python Data Source API
  2. test_python_datasource.py: tests for filter pushdown
  3. PythonScanBuilder.scala: implement filter pushdown API in Scala
  4. UserDefinedPythonDataSource.scala, data_source_pushdown_filters.py: communication between Python and Scala and filter pushdown logic
    • Note that the current filter serialization is a placeholder. An upcoming PR will implement the actual serialization.

Changes to interactions between Python and Scala

Original sequence:

sequenceDiagram
    # autonumber
    participant S as Data Sources API
    participant D as PythonDataSourceV2
    participant P as Python Worker
    participant U as User Implementation

    S ->> D: PythonDataSourceV2.inferSchema(options)
    
    D ->+ P: create_data_source.py
    D -->> P: pickled DS class, name,<br/>schema, options
    P ->+ U: unpickle DS class
    P ->> U: DataSource(options)
    U -->> P: DS instance
    U ->- P: pickle DS instance
    P -->>- D: pickled DS, schema

    D -->> S: schema
    S ->> D: PythonDataSourceV2.getTable(...)
    D -->> S: PythonTable
    S ->> D: PythonTable.newScanBuilder(options)
    D -->> S: PythonScanBuilder
    S ->> D: PythonScanBuilder.build()
    D -->> S: PythonScan
    S ->> D: PythonScan.toBatch()
    D -->> S: PythonBatch
    S ->> D: PythonBatch.planInputPartitions()

    D ->+ P: plan_data_source_read.py
    D -->> P: pickled DS, schema, ...
    P ->+ U: unpickle DS
    P ->> U: DS.reader(schema)
    U -->> P: reader
    P ->> U: reader.partitions()
    U -->> P: partitions
    U ->- P: pickle reader
    P -->>- D: pickled read,<br/>pickled partitions

    D -->> S: partitions
Loading

Updated sequence (new interactions are highlighted in yellow):

sequenceDiagram
    # autonumber
    participant S as Data Sources API
    participant D as PythonDataSourceV2
    participant P as Python Worker
    participant U as User Implementation

    S ->> D: PythonDataSourceV2.inferSchema(options)
    
    D ->+ P: create_data_source.py
    D -->> P: pickled DS class, name,<br/>schema, options
    P ->+ U: unpickle DS class
    P ->> U: DataSource(options)
    U -->> P: DS instance
    U ->- P: pickle DS instance
    P -->>- D: pickled DS, schema

    D -->> S: schema
    S ->> D: PythonDataSourceV2.getTable(...)
    D -->> S: PythonTable
    S ->> D: PythonTable.newScanBuilder(options)
    D -->> S: PythonScanBuilder

    rect rgb(255, 252, 238)
    S ->> D: PythonScanBuilder.pushFilters(filters)

    note right of D: Pushdown filters
    note right of D: Only simple filters are serialized<br/> and passed to Python
    note right of D: Other more complex filters are<br/> directly marked as unsupported
    D ->+ P: data_source_pushdown_filters.py
    D -->> P: pickled DS, filters, schema
    P ->+ U: unpickle DS
    P ->> U: DS.reader(schema)
    U -->> P: DataSourceReader
    P ->> U: reader.push_filters(filters)
    U -->> P: unsupported filters
    U ->- P: pickle DS with <br/>monkey patched reader()
    P -->>- D: pickled DS, supported filters

    D -->> S: unsupported filters, supported filters

    S ->> D: PythonScanBuilder.pushedFilters()
    D -->> S: supported filters
    end

    S ->> D: PythonScanBuilder.build()
    D -->> S: PythonScan
    S ->> D: PythonScan.toBatch()
    D -->> S: PythonBatch
    S ->> D: PythonBatch.planInputPartitions()

    D ->+ P: plan_data_source_read.py
    D -->> P: pickled DS, schema, ...
    P ->+ U: unpickle DS
    P ->> U: DS.reader(schema)
    U -->> P: DataSourceReader
    P ->> U: reader.partitions()
    U -->> P: partitions
    U ->- P: pickle read function
    P -->>- D: pickled read,<br/>pickled partitions

    D -->> S: partitions
Loading

Why are the changes needed?

Filter pushdown allows reducing the amount of data produced by the reader, by filtering rows directly in the data source scan. The reduction in the amount of data can improve query performance. This PR implements filter pushdown for Python Data Sources API using the existing Scala DS filter pushdown API. An upcoming PR will implement the actual filter types and the serialization of filters.

Does this PR introduce any user-facing change?

Yes. New API are added. See datasource.py for details.

The new API is optional to implement. If not implemented, the reader will behave as before.

The feature is also controlled by the new spark.sql.python.filterPushdown.enabled configuration which is disabled by default.
If the conf is enabled, the new code path for filter pushdown is used. Otherwise, the code path is skipped and we throw an exception if the user implements DataSourceReader.pushFilters() so that it's not ignored silently.

How was this patch tested?

Tests added to test_python_datasource.py to check that:

  • pushed filters are not reapplied by Spark
  • unsupported filters are applied by Spark
  • pushdown happens before partitions
  • reader state is preserved after pushdown
  • pushdown is not called if no filters are present
  • conf is respected
  • ...

Was this patch authored or co-authored using generative AI tooling?

No

@wengh wengh force-pushed the pyds-filter-pushdown branch 6 times, most recently from 9d3bbc2 to d4a757a Compare February 15, 2025 01:02
@wengh wengh changed the title [WIP][PYTHON] Add filter pushdown API to Python Data Sources [SPARK-51271][PYTHON] Add filter pushdown API to Python Data Sources Feb 20, 2025
@wengh wengh marked this pull request as ready for review February 20, 2025 21:48
@wengh wengh force-pushed the pyds-filter-pushdown branch from 6cff481 to b81c535 Compare February 24, 2025 23:34
@wengh wengh marked this pull request as draft February 26, 2025 23:15
@wengh wengh changed the title [SPARK-51271][PYTHON] Add filter pushdown API to Python Data Sources [WIP][SPARK-51271][PYTHON] Add filter pushdown API to Python Data Sources Feb 26, 2025
@wengh wengh marked this pull request as ready for review February 27, 2025 22:52
@wengh wengh requested a review from allisonwang-db February 27, 2025 22:52
@wengh wengh changed the title [WIP][SPARK-51271][PYTHON] Add filter pushdown API to Python Data Sources [SPARK-51271][PYTHON] Add filter pushdown API to Python Data Sources Feb 28, 2025
override def build(): Scan = new PythonScan(ds, shortName, outputSchema, options)
options: CaseInsensitiveStringMap)
extends ScanBuilder
with SupportsPushDownFilters {
Copy link
Contributor

@beliefer beliefer Mar 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because Python data source is new, we should use SupportsPushDownV2Filters first.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We decided to only support V1 filters to keep the Python API simple.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan
Copy link
Contributor

For DS v2, the scan workflow is:

  1. analyzer gets Table from TableProvider, and puts it in DataSourceV2Relation (for batch scan) or StreamingRelationV2 (for streaming scan). The Table reports schema because the relation logical plan needs it.
  2. optimizer gets Scan from Table by doing operator pushdown, and turns the relation into DataSourceV2ScanRelation. (streaming does not support filter pushdown yet)
  3. planner gets Batch or Stream from Scan to build the physical scan node.

Python data source API is a bit different from DS v2 and there is no 1-1 mapping. I think the current mapping is

  1. analyzer gets pickled python DataSource instance and keeps it in PythonDataSourceV2 which extends TableProvider. The PythonTable (extends Table) simply references PythonDataSourceV2.
  2. optimizer gets PythonScan (extends Scan) from PythonTable. PythonScan also simply references PythonTable and does nothing else.
  3. planner gets PythonBatch (extends Batch) from PythonScan, which does some real job: launch a python worker to create the python batch reader and pickle it. The pickled instance is kept in PythonBatch.

Now to push down filters, we need to create the python batch reader earlier, which means one more round of Python worker communication in the optimizer. I'm wondering that once we finish pushdown, shall we do the planning work immediately and keep PythonDataSourceReadInfo in PythonScan?


// Optionally called by DSv2 once to push down filters before the scan is built.
override def pushFilters(filters: Array[Filter]): Array[Filter] = {
if (!SQLConf.get.pythonFilterPushDown) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It means we should always pushdown filters if PythonScanBuilder supports SupportsPushDownFilters.
Why do we need this config?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'd like to avoid the new code path (serializing filters, running new python worker, ...) for existing Python data sources that don't implement pushdown. So in case there's a crash or a performance issue in the new code path, its impact is limited.

However we currently don't have a good way to detect whether user has implemented pushFilters() in Python DataSourceReader before ScanBuilder.pushFilters() is called. This is because we don't know whether it's a streaming read or a batch read at this point (the optimizer knows but the data source doesn't get this info) so it's not safe to call Python DataSource.reader() to get the batch reader instance.

So we instead add a conf to turn off the new code path. But also if the user imeplements pushFilters() and this conf is disabled then we throw an error to let the user know that they must turn on the conf to enable filter pushdown.

In the future if we figure out how to check whether the Python reader imeplements pushFilters then we can set this conf to enabled by default and deprecate it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for explanation.

@wengh
Copy link
Contributor Author

wengh commented Mar 7, 2025

@cloud-fan

Now to push down filters, we need to create the python batch reader earlier, which means one more round of Python worker communication in the optimizer. I'm wondering that once we finish pushdown, shall we do the planning work immediately and keep PythonDataSourceReadInfo in PythonScan?

Good idea to avoid the extra round of worker. I think that would require some refactoring of the plan_read worker so I'll implement that in a new PR since this PR is already very large.

Also when we add column pruning support we should get partitions in column pruning worker rather than filter pushdown worker.

@wengh wengh requested review from cloud-fan and beliefer March 7, 2025 23:27
Copy link
Contributor

@beliefer beliefer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM expect some minor comments.

@beliefer
Copy link
Contributor

cc @cloud-fan @allisonwang-db

Copy link
Contributor

@allisonwang-db allisonwang-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good! Left some minor comments

@@ -4673,6 +4673,13 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val PYTHON_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.python.filterPushdown.enabled")
.doc("When true, enable filter pushdown to Python datasource, at the cost of running " +
"Python worker one additional time during planning.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we still have additional planning now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently this is still true. I have a separate PR to combine filter pushdown & plan read.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wengh added 3 commits March 19, 2025 10:25
add docstring

port fixes from serialization pr

monkey patch data source to avoid changing existing code

remove worker_main

improve documentation and rename to be consistent with DSv2

add comments

fix lint

add pushed filter info to plan description

update error message

conf for filter pushdown

check that pushFilters is called from explain()

address review

Update sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonScan.scala

Co-authored-by: Jiaan Geng <[email protected]>

Update sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonDataSourceV2.scala

Co-authored-by: Jiaan Geng <[email protected]>

use withSQLConf

revert PythonScan description change
@wengh wengh force-pushed the pyds-filter-pushdown branch from 59d48d7 to 2c584a1 Compare March 19, 2025 17:26
@allisonwang-db
Copy link
Contributor

Thanks, merging to master.

cloud-fan pushed a commit that referenced this pull request Mar 26, 2025
…ter pushdown

Follow up of #49961

### What changes were proposed in this pull request?

This PR adds the serialization and deserialization required to pass V1 data source filters from JVM to Python. Also adds the equivalent Python dataclass representation of the filters.

To ensure that filter values are properly converted to Python values, we use `VariantVal` to serialize catalyst values into binary, then deserialize into Python `VariantVal`, then convert to Python values.

#### Examples

Supported filters

| SQL filter          | Representation                             |
|---------------------|--------------------------------------------|
| `a.b.c = 1`         | `EqualTo(("a", "b", "c"), 1)`              |
| `a = 1`             | `EqualTo(("a",), 1)`                       |
| `a = 'hi'`          | `EqualTo(("a",), "hi")`                    |
| `a = array(1, 2)`   | `EqualTo(("a",), [1, 2])`                  |
| `a`                 | `EqualTo(("a",), True)`                    |
| `not a`             | `Not(EqualTo(("a",), True))`               |
| `a <> 1`            | `Not(EqualTo(("a",), 1))`                  |
| `a > 1`             | `GreaterThan(("a",), 1)`                   |
| `a >= 1`            | `GreaterThanOrEqual(("a",), 1)`            |
| `a < 1`             | `LessThan(("a",), 1)`                      |
| `a <= 1`            | `LessThanOrEqual(("a",), 1)`               |
| `a in (1, 2, 3)`    | `In(("a",), (1, 2, 3))`                    |
| `a is null`         | `IsNull(("a",))`                           |
| `a is not null`     | `IsNotNull(("a",))`                        |
| `a like 'abc%'`     | `StringStartsWith(("a",), "abc")`          |
| `a like '%abc'`     | `StringEndsWith(("a",), "abc")`            |
| `a like '%abc%'`    | `StringContains(("a",), "abc")`            |

Unsupported filters
- `a = b`
- `f(a, b) = 1`
- `a % 2 = 1`
- `a[0] = 1`
- `a < 0 or a > 1`
- `a like 'c%c%'`
- `a ilike 'hi'`
- `a = 'hi' collate zh`

### Why are the changes needed?

The base PR #49961 only supported EqualTo int. This PR adds support for many other useful filter types making Python Data Source filter pushdown API actually useful.

### Does this PR introduce _any_ user-facing change?

Yes. Python Data Source now supports more pushdown filter types.

### How was this patch tested?

End-to-end tests in `test_python_datasource.py`.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #50252 from wengh/pyds-filter-serialization.

Authored-by: Haoyu Weng <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
allisonwang-db pushed a commit that referenced this pull request Mar 27, 2025
… workers

Follow up of #49961

### What changes were proposed in this pull request?

As pointed out by #49961 (comment), at the time of filter pushdown we already have enough information to also plan read partitions. So this PR changes the filter pushdown worker to also get partitions, reducing the number of exchanges between Python and Scala.

Changes:
- Extract part of `plan_data_source_read.py` that is responsible for sending the partitions and the read function to JVM.
- Use the extracted logic to also send the partitions and read function when doing filter pushdown in `data_source_pushdown_filters.py`.
- Update the Scala code accordingly.

### Why are the changes needed?

To improve Python Data Source performance when filter pushdown configuration is enabled.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests in `test_python_datasource.py`

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #50340 from wengh/pyds-combine-pushdown-plan.

Authored-by: Haoyu Weng <[email protected]>
Signed-off-by: Allison Wang <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants