Skip to content

[Discussion] Efficient Row Selection for Multi-Engine Support #14816

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
alchemist51 opened this issue Feb 21, 2025 · 12 comments
Open

[Discussion] Efficient Row Selection for Multi-Engine Support #14816

alchemist51 opened this issue Feb 21, 2025 · 12 comments

Comments

@alchemist51
Copy link

alchemist51 commented Feb 21, 2025

Background

We have an usecase where data is stored in multiple engines/formats and Parquet is the primary format containing all the data. While text queries are handled by inverted index format, numeric data queries and aggregations are processed via Parquet files. While the file formats are different, the data is sorted and stored in the same order across them.

We are using DataFusion to query Parquet files and wondering if the result of the query can be represented as a bit set of the document position (example below). Bit sets from the different engines can be intersected to identify the documents which meets the criteria. The resulting bit set then can be used to fetch the relevant documents from Parquet.

Example:

Assume we have the following data stored in parquet file:
colA colB
200 Autumn leaves
200 Salty breeze
100 Misty mountains
100 Misty mountains
200 Velvet curtains

For example, assume have an query like SELECT colB where colA = 100

The matching documents can be represented in the form of bitset : 00110 (row number starts from left). We want to use the matching document information collected from any underlying engine to fetch the relevant documents in the parquet file using DataFusion.

What we explored

We explored that one of the ways to fetch specific rows in DataFusion is by creating an access plan and passing it to ParquetExec. Since we need the complete plan, we can't parallelize it and start collecting data from Parquet, which reduces the overall query performance and is also memory-inefficient as we need to iterate the complete stream and convert it to the AccessPlan.

Possible Solution

If there is a way to:
  1. Pass the iterator directly to DataFusion, or
  2. Process the matching rows in batches.
Then it will enable on-demand conversion from the matching rows iterator to RowSelection in DataFusion thus improving efficiency by reducing memory overhead.

Questions

  1. Are there existing mechanisms in DataFusion to handle external iterators or row sources?
  2. What are the best practices for integrating DataFusion with external data sources in a streaming or batched manner?
  3. Are there any plans or ongoing work in the DataFusion project that might address this use case?
  4. Any alternative approaches or design patterns that might help us achieve efficient row selection in our multi-engine implementation?

@alchemist51
Copy link
Author

@alamb @andygrove please provide your opinion on this usecase!

@alamb
Copy link
Contributor

alamb commented Feb 22, 2025

Are there existing mechanisms in DataFusion to handle external iterators or row sources?

There is a PR we are currently working on related to metadata columns (which could provide row ids perhaps)

What are the best practices for integrating DataFusion with external data sources in a streaming or batched manner?

Are there any plans or ongoing work in the DataFusion project that might address this use case?

Any alternative approaches or design patterns that might help us achieve efficient row selection in our multi-engine implementation?

I think you should check out https://github.com/datafusion-contrib/datafusion-federation which has a variety of items that are used for building a federated query engine

@philippemnoel may also have ideas / suggestions for this

@alamb
Copy link
Contributor

alamb commented Feb 22, 2025

We are using DataFusion to query Parquet files and wondering if the result of the query can be represented as a bit set of the document position (example below). Bit sets from the different engines can be intersected to identify the documents which meets the criteria. The resulting bit set then can be used to fetch the relevant documents from Parquet.

I think there are two parts to your question:

  1. Representing the results as a bitset: I think you would have to imlement a custom "pivot" type operation that took row ids somehow and created a bitset from them

  2. Fetching only relevant documents from parquet: the curent reader is efficiently setup to fetch large contiguous blocks of values (RowSelection). @XiangpengHao has been thinking about a bitset representation for selected rows recently so perhaps you can help contribute to making that happen in the parquet reader

@alchemist51
Copy link
Author

Thanks for the response @alamb ! Couple of follow up questions:

There is a PR we are currently working on related to metadata columns (which could provide row ids perhaps)
#14057

Is there any way to get the row_id data for Parquet? Any suggestion to build it? @alamb @chenkovsky

Fetching only relevant documents from parquet: the curent reader is efficiently setup to fetch large contiguous blocks of values (RowSelection). @XiangpengHao has been thinking about a bitset representation for selected rows recently so perhaps you can help contribute to making that happen in the parquet reader

Will be happy to collaborate on it. @XiangpengHao any initial plan or POC you have done for it?

@chenkovsky
Copy link
Contributor

Thanks for the response @alamb ! Couple of follow up questions:

There is a PR we are currently working on related to metadata columns (which could provide row ids perhaps)
#14057

Is there any way to get the row_id data for Parquet? Any suggestion to build it? @alamb @chenkovsky

Fetching only relevant documents from parquet: the curent reader is efficiently setup to fetch large contiguous blocks of values (RowSelection). @XiangpengHao has been thinking about a bitset representation for selected rows recently so perhaps you can help contribute to making that happen in the parquet reader

Will be happy to collaborate on it. @XiangpengHao any initial plan or POC you have done for it?

@Arpit-Bandejiya

I created an example for getting row_id for parquet based on PR #14057. https://github.com/chenkovsky/datafusion/pull/3/files

@bharath-techie
Copy link

Hi @chenkovsky ,
Thanks a ton for quick POC on this. :)

The row ids seems to be specific to each batch and not across the entire parquet file - is my understanding correct ?

Reason is our use case will mainly benefit from parquet file level row ids.

@chenkovsky
Copy link
Contributor

Hi @chenkovsky ,
Thanks a ton for quick POC on this. :)

The row ids seems to be specific to each batch and not across the entire parquet file - is my understanding correct ?

Reason is our use case will mainly benefit from parquet file level row ids.

@bharath-techie yes,This is just an example, not for real situation. If it needs to meet the actual requirements, I will need more time. i think i have to learn more about parquet.

@bharath-techie
Copy link

Thanks @chenkovsky for confirming.

We are new to datafusion , but at high level looks like this feature will need a deeper integration in the ParquetExec flow and we might need changes in ParquetRecordBatchStream in arrow-rs as it performs pruning and at datafusion layer we might not be able to figure out the actual row ids because of it.

Experts can comment on this / see if there are any other ways that they can think of.

@alchemist51
Copy link
Author

alchemist51 commented Feb 25, 2025

Found this PR in arrow-rs : apache/arrow-rs#6624 . @XiangpengHao I see the PR is in draft from sometime. Is there any other way we are trying to do it? Can you please share it.

@XiangpengHao
Copy link
Contributor

XiangpengHao commented Feb 25, 2025

Hi @Arpit-Bandejiya sorry I've been quite busy these days.

If you have a bitmask and want to only read the flagged rows from Parquet, you can directly use ParquetRecordBatchBuilder::with_row_selection: https://docs.rs/parquet/latest/parquet/arrow/arrow_reader/struct.ArrowReaderBuilder.html#method.with_row_selection

If you want DataFusion to produce a bitmask for other systems -- I'm not aware of an easy way to do this. But this sounds like a join use case, have you considered adding a row_id column to the parquet files? so that you can select the row_id as the output and join with other systems.

DataFusion has no control over the row id read from Parquet, especially with filter pushdown, where row ids are heavily filtered. Even changing the ParquetRecordBatchStream as @bharath-techie pointed out is not enough, as concurrent reading can happen, it's possible but quite hard to determine the starting row_id of each stream.
In fact, the reader has the freedom to emit rows in any order, as long as they are logically equivalent.

@alchemist51
Copy link
Author

alchemist51 commented Mar 7, 2025

Thanks @XiangpengHao for the response.

If you want DataFusion to produce a bitmask for other systems -- I'm not aware of an easy way to do this. But this sounds like a join use case, have you considered adding a row_id column to the parquet files? so that you can select the row_id as the output and join with other systems.

I'm trying to do it in the same fashion of using row-id, the problem comes for sparse results in different results from engines. For example if one engine iterator is sparse while datafusion is returning almost every row it becomes quite inefficient because essentially it will end up loading all the data from datafusion. The problem aggravates a bit more since we are now fetching one more column aka row_id from the file. Few query engines like lucene support advance seek operation though I'm not sure if that is possible with datafusion or parquet file in general.

Is there any way in datafusion that we get different iterators for each of the file partitions we do when we create the physical plan? I'm thinking to divide the files into multiple partitions which can help me optimize in doing advance seek operation. For example if the next results lies in the next partition we can close the ongoing stream and process the next partition to avoid reading all pages.

Sorry for the late response, was occupied in few other stuff.

@XiangpengHao
Copy link
Contributor

Is there any way in datafusion that we get different iterators for each of the file partitions we do when we create the physical plan?

Not sure if this is what you want, but probably relevant: https://github.com/apache/datafusion/blob/main/datafusion/datasource-parquet/src/opener.rs#L277

cc @mbutrovich who might also be interested in this discussion

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants