Skip to content

Support metadata columns (location, size, last_modified) in ListingTableProvider #15173

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
phillipleblanc opened this issue Mar 12, 2025 · 9 comments
Labels
enhancement New feature or request

Comments

@phillipleblanc
Copy link
Contributor

Is your feature request related to a problem or challenge?

The ListingTableProvider in DataFusion provides an implementation of a TableProvider that organizes a collection of (potentially hive partitioned) files in an object store into a single table.

Similar to how hive partitions are injected into the listing table schema, but they don't actually exist in the physical parquet files - I'd like to be able to request the ListingTable to inject metadata columns that get their data from the ObjectMeta provided by the object store crate. Then I can query for and filter on the columns location, size and last_modified).

I'd also like queries that filter on the metadata columns to be able to prune out files, similar to partition pruning. I.e. if I do SELECT * FROM my_listing_table WHERE last_modified > '2025-03-10' then only files that were modified after '2025-03-10' should be passed to the FileScanConfig to be read.

My scenario is I'd like to be able to efficiently ingest files from an object store bucket that I haven't seen before - and filtering on last_modified seems like a good solution.

This could potentially fold into the work ongoing in #13975 / #14057 / #14362 to mark these columns as proper system/metadata columns - but it fundamentally isn't blocked on that work. Since this would be an opt-in from the consumer, automatic filtering out on a SELECT * doesn't seem required.

Describe the solution you'd like

A new API on the ListingOptions struct that is passed to a ListingTableConfig which is passed to ListingTable::try_new.

    /// Set metadata columns on [`ListingOptions`] and returns self.
    ///
    /// "metadata columns" are columns that are computed from the `ObjectMeta` of the files from object store.
    ///
    /// Available metadata columns:
    /// - `location`: The full path to the object
    /// - `last_modified`: The last modified time
    /// - `size`: The size in bytes of the object
    ///
    /// For example, given the following files in object store:
    ///
    /// ```text
    /// /mnt/nyctaxi/tripdata01.parquet
    /// /mnt/nyctaxi/tripdata02.parquet
    /// /mnt/nyctaxi/tripdata03.parquet
    /// ```
    ///
    /// If the `last_modified` field in the `ObjectMeta` for `tripdata01.parquet` is `2024-01-01 12:00:00`,
    /// then the table schema will include a column named `last_modified` with the value `2024-01-01 12:00:00`
    /// for all rows read from `tripdata01.parquet`.
    ///
    /// | <other columns> | last_modified         |
    /// |-----------------|-----------------------|
    /// | ...             | 2024-01-01 12:00:00   |
    /// | ...             | 2024-01-02 15:30:00   |
    /// | ...             | 2024-01-03 09:15:00   |
    ///
    /// # Example
    /// ```
    /// # use std::sync::Arc;
    /// # use datafusion::datasource::{listing::ListingOptions, file_format::parquet::ParquetFormat};
    ///
    /// let listing_options = ListingOptions::new(Arc::new(
    ///     ParquetFormat::default()
    ///   ))
    ///   .with_metadata_cols(vec![MetadataColumn::LastModified]);
    ///
    /// assert_eq!(listing_options.metadata_cols, vec![MetadataColumn::LastModified]);
    /// ```
    pub fn with_metadata_cols(mut self, metadata_cols: Vec<MetadataColumn>) -> Self {
        self.metadata_cols = metadata_cols;
        self
    }

The definition for MetadataColumn is a simple enum:

/// A metadata column that can be used to filter files
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum MetadataColumn {
    /// The location of the file in object store
    Location,
    /// The last modified timestamp of the file
    LastModified,
    /// The size of the file in bytes
    Size,
}

The order of the MetadataColumn passed into with_metadata_cols denotes the order it will appear in the table schema. Metadata columns will be added after partition columns.

Describe alternatives you've considered

I considered what it might look like to make ListingTableProvider more extensible to be able to implement these changes without a core DataFusion change. I wasn't able to come up with anything simpler than the above though.

Another option might be to make a lot of the internals of ListingTableProvider public so that it is easier for people to maintain their own customized versions of ListingTableProvider.

Additional context

I've already implemented this in my project, I will be upstreaming my change and linking to this issue. To view what this looks like already implemented, see: spiceai#74

And to see the changes needed to integrate with it from a consuming project, see: spiceai/spiceai#4970 (It is quite contained, which I'm happy with)

This change will have no visible effect on consumers - they need to explicitly opt-in to see the metadata columns.

@phillipleblanc phillipleblanc added the enhancement New feature or request label Mar 12, 2025
@alamb
Copy link
Contributor

alamb commented Mar 12, 2025

This could potentially fold into the work ongoing in #13975 / #14057 / #14362 to mark these columns as proper system/metadata columns - but it fundamentally isn't blocked on that work. Since this would be an opt-in from the consumer, automatic filtering out on a SELECT * doesn't seem required.

I agree -- it would be amazing if someone could help finish / get tha tPR over the line. I haven't been following it

Getting metadata colunms in would be sweeeeeet

@alamb
Copy link
Contributor

alamb commented May 1, 2025

There is another request here that is related I think (basically "row number within the file"):

It seems to me what has happened is that the existing ListingTableProvider in DataFusion has to walk a fine balance between eing relatively "simple" and additional functionality like this while widely useful I think also could be implemented outside the core with the existing APIs.

As we add more features to ListingTable it will look more and more like a full featured Table Format, but there are already many implementations of such formats (for example Iceberg, Delta, etc)

Thus I think it might be good to propose we creating a datafusion-contrib project with make a more full featured "TableProvider" that adds new features like:

  1. Additional metadata clumns
  2. Support file row index / row id for each file in a ListingTableProvider #15892
  3. etc

@phillipleblanc
Copy link
Contributor Author

phillipleblanc commented May 2, 2025

Yeah that makes sense. Part of the complexity here is that one of the features needed to make the ListingTableProvider work today (i.e. partition columns) are actually implemented in the core FileStream. And in order to add the metadata columns I want, I need to pipe through the ObjectMeta in the FileStream as well.

So it would end up being quite a lot of code duplication to fork out of the core repo to get this working outside of core DataFusion. I do think there is opportunity for some better extensibility/abstractions that would allow people to implement something like this on their own - but I'm not sure what that is yet.

I think a good first step would be figuring out how to abstract out the partition columns so that the existing ListingTable can work without any special code in core.

@alamb
Copy link
Contributor

alamb commented May 2, 2025

I think a good first step would be figuring out how to abstract out the partition columns so that the existing ListingTable can work without any special code in core.

I agree -- this would make a lot of sense.

Maybe we could do something like pul PartitionColumnProjector out of FileStream:

/// The partition column projector
pc_projector: PartitionColumnProjector,

And then making another stream wrapper that did the projecting

/// Adds partitoning columns to an inner stream
pub struct PartitionColumnStream<S> {
       pc_projector: PartitionColumnProjector, 
      inner: S
}

🤔

@alamb
Copy link
Contributor

alamb commented May 2, 2025

Here is another example of complexity related to this type of operation:

(and all the more reason not to make ListingTable more complicated)

@phillipleblanc
Copy link
Contributor Author

And then making another stream wrapper that did the projecting

Actually, yeah - that would work quite nicely I think. If we changed FileStream's interface from returning RecordBatches to returning a Tuple of (RecordBatch, ObjectMeta) then it solves both cases.

i.e. from:

impl Stream for FileStream {
    type Item = Result<RecordBatch>;

to

impl Stream for FileStream {
    type Item = Result<(RecordBatch, ObjectMeta)>;

That would allow for injection of metadata columns in my custom implementation and for parsing out of the partition column values from the location in the ListingTableProvider (instead of having the inner FileStream handle that complexity).

@alamb
Copy link
Contributor

alamb commented May 5, 2025

That would allow for injection of metadata columns in my custom implementation and for parsing out of the partition column values from the location in the ListingTableProvider (instead of having the inner FileStream handle that complexity).

Sounds like a great idea to me. Are you willing to make a PR for it? Or shall we write it up in more detail and try to see if anyone else is interested in helping out?

@phillipleblanc
Copy link
Contributor Author

I will handle the PR for this, I'll create an issue anyway separate from this for tracking.

@adriangb
Copy link
Contributor

adriangb commented May 7, 2025

I'll point out that I was testing DuckDB and they have this very nice feature:

D select filename, sum(row_count) as row_count from read_parquet('/Users/adriangb/Downloads/data2/**/*_stats.parquet', filename=true) group by filename order by row_count desc limit 10;
Binder Error:
Option filename adds column "filename", but a column with this name is also in the file. Try setting a different name: filename='<filename column name>'

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
3 participants