Skip to content

[WIP] Experiment with DataFusion against Arrow with Extension DataType support #15663

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
350 changes: 203 additions & 147 deletions Cargo.lock

Large diffs are not rendered by default.

18 changes: 9 additions & 9 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -87,19 +87,19 @@ ahash = { version = "0.8", default-features = false, features = [
"runtime-rng",
] }
apache-avro = { version = "0.17", default-features = false }
arrow = { version = "54.3.1", features = [
arrow = { git = "https://github.com/paleolimbot/arrow-rs.git", branch = "type-extension-maybe", features = [
"prettyprint",
"chrono-tz",
] }
arrow-buffer = { version = "54.3.0", default-features = false }
arrow-flight = { version = "54.3.1", features = [
arrow-buffer = { git = "https://github.com/paleolimbot/arrow-rs.git", branch = "type-extension-maybe", default-features = false }
arrow-flight = { git = "https://github.com/paleolimbot/arrow-rs.git", branch = "type-extension-maybe", features = [
"flight-sql-experimental",
] }
arrow-ipc = { version = "54.3.0", default-features = false, features = [
arrow-ipc = { git = "https://github.com/paleolimbot/arrow-rs.git", branch = "type-extension-maybe", default-features = false, features = [
"lz4",
] }
arrow-ord = { version = "54.3.0", default-features = false }
arrow-schema = { version = "54.3.0", default-features = false }
arrow-ord = { git = "https://github.com/paleolimbot/arrow-rs.git", branch = "type-extension-maybe", default-features = false }
arrow-schema = { git = "https://github.com/paleolimbot/arrow-rs.git", branch = "type-extension-maybe", default-features = false }
async-trait = "0.1.88"
bigdecimal = "0.4.8"
bytes = "1.10"
Expand Down Expand Up @@ -147,9 +147,9 @@ hashbrown = { version = "0.14.5", features = ["raw"] }
indexmap = "2.8.0"
itertools = "0.14"
log = "^0.4"
object_store = { version = "0.11.0", default-features = false }
object_store = { version = "0.12.0", default-features = false, features = ["fs"] }
parking_lot = "0.12"
parquet = { version = "54.3.1", default-features = false, features = [
parquet = { git = "https://github.com/paleolimbot/arrow-rs.git", branch = "type-extension-maybe", default-features = false, features = [
"arrow",
"async",
"object_store",
Expand All @@ -159,7 +159,7 @@ pbjson-types = "0.7"
# Should match arrow-flight's version of prost.
insta = { version = "1.41.1", features = ["glob", "filters"] }
prost = "0.13.1"
rand = "0.8.5"
rand = { version = "0.9", features = ["std_rng"] }
recursive = "0.1.1"
regex = "1.8"
rstest = "0.24.0"
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/src/cancellation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use futures::TryStreamExt;
use object_store::ObjectStore;
use parquet::arrow::async_writer::ParquetObjectWriter;
use parquet::arrow::AsyncArrowWriter;
use rand::distributions::Alphanumeric;
use rand::distr::Alphanumeric;
use rand::rngs::ThreadRng;
use rand::Rng;
use structopt::StructOpt;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ log = { workspace = true }
object_store = { workspace = true, optional = true }
parquet = { workspace = true, optional = true, default-features = true }
paste = "1.0.15"
pyo3 = { version = "0.23.5", optional = true }
pyo3 = { version = "0.24.1", optional = true }
recursive = { workspace = true, optional = true }
sqlparser = { workspace = true }
tokio = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ config_namespace! {
/// bytes of the parquet file optimistically. If not specified, two reads are required:
/// One read to fetch the 8-byte parquet footer and
/// another to fetch the metadata length encoded in the footer
pub metadata_size_hint: Option<usize>, default = None
pub metadata_size_hint: Option<u64>, default = None

/// (reading) If true, filter expressions are be applied during the parquet decoding operation to
/// reduce the number of rows decoded. This optimization is sometimes called "late materialization".
Expand Down
3 changes: 2 additions & 1 deletion datafusion/common/src/scalar/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2103,7 +2103,8 @@ impl ScalarValue {
| DataType::Time64(TimeUnit::Millisecond)
| DataType::RunEndEncoded(_, _)
| DataType::ListView(_)
| DataType::LargeListView(_) => {
| DataType::LargeListView(_)
| DataType::Extension(_) => {
return _not_impl_err!(
"Unsupported creation of {:?} array from ScalarValue {:?}",
data_type,
Expand Down
1 change: 1 addition & 0 deletions datafusion/common/src/types/native.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,7 @@ impl From<DataType> for NativeType {
DataType::Map(field, _) => Map(Arc::new(field.as_ref().into())),
DataType::Dictionary(_, data_type) => data_type.as_ref().clone().into(),
DataType::RunEndEncoded(_, field) => field.data_type().clone().into(),
DataType::Extension(extension) => extension.storage_type().clone().into(),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ env_logger = { workspace = true }
insta = { workspace = true }
paste = "^1.0"
rand = { workspace = true, features = ["small_rng"] }
rand_distr = "0.4.3"
rand_distr = "0.5.1"
regex = { workspace = true }
rstest = { workspace = true }
serde_json = { workspace = true }
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/benches/parquet_query_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ use datafusion_common::instant::Instant;
use futures::stream::StreamExt;
use parquet::arrow::ArrowWriter;
use parquet::file::properties::{WriterProperties, WriterVersion};
use rand::distributions::uniform::SampleUniform;
use rand::distributions::Alphanumeric;
use rand::prelude::*;
use rand::distr::uniform::SampleUniform;
use rand::distr::Alphanumeric;
use rand::{prelude::*, thread_rng};
use std::fs::File;
use std::io::Read;
use std::ops::Range;
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/file_format/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ mod tests {
let object_meta = ObjectMeta {
location,
last_modified: DateTime::default(),
size: usize::MAX,
size: u64::MAX,
e_tag: None,
version: None,
};
Expand Down Expand Up @@ -485,7 +485,7 @@ mod tests {
let object_meta = ObjectMeta {
location,
last_modified: DateTime::default(),
size: usize::MAX,
size: u64::MAX,
e_tag: None,
version: None,
};
Expand Down
12 changes: 6 additions & 6 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ mod tests {

async fn get(&self, location: &Path) -> object_store::Result<GetResult> {
let bytes = self.bytes_to_repeat.clone();
let range = 0..bytes.len() * self.max_iterations;
let range = 0..bytes.len() as u64 * self.max_iterations as u64;
let arc = self.iterations_detected.clone();
let stream = futures::stream::repeat_with(move || {
let arc_inner = arc.clone();
Expand Down Expand Up @@ -138,7 +138,7 @@ mod tests {
async fn get_ranges(
&self,
_location: &Path,
_ranges: &[Range<usize>],
_ranges: &[Range<u64>],
) -> object_store::Result<Vec<Bytes>> {
unimplemented!()
}
Expand All @@ -153,8 +153,8 @@ mod tests {

fn list(
&self,
_prefix: Option<&Path>,
) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
_: Option<&Path>,
) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
unimplemented!()
}

Expand Down Expand Up @@ -371,7 +371,7 @@ mod tests {
let object_meta = ObjectMeta {
location: Path::parse("/")?,
last_modified: DateTime::default(),
size: usize::MAX,
size: u64::MAX,
e_tag: None,
version: None,
};
Expand Down Expand Up @@ -429,7 +429,7 @@ mod tests {
let object_meta = ObjectMeta {
location: Path::parse("/")?,
last_modified: DateTime::default(),
size: usize::MAX,
size: u64::MAX,
e_tag: None,
version: None,
};
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,8 +330,8 @@ mod tests {

fn list(
&self,
_prefix: Option<&Path>,
) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
_: Option<&Path>,
) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
Box::pin(futures::stream::once(async {
Err(object_store::Error::NotImplemented)
}))
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ mod tests {
let meta = ObjectMeta {
location,
last_modified: metadata.modified().map(chrono::DateTime::from).unwrap(),
size: metadata.len() as usize,
size: metadata.len(),
e_tag: None,
version: None,
};
Expand Down
14 changes: 7 additions & 7 deletions datafusion/core/src/datasource/physical_plan/arrow_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ impl FileOpener for ArrowOpener {
)?;
// read footer according to footer_len
let get_option = GetOptions {
range: Some(GetRange::Suffix(10 + footer_len)),
range: Some(GetRange::Suffix(10 + footer_len as u64)),
..Default::default()
};
let get_result = object_store
Expand All @@ -332,9 +332,9 @@ impl FileOpener for ArrowOpener {
.iter()
.flatten()
.map(|block| {
let block_len = block.bodyLength() as usize
+ block.metaDataLength() as usize;
let block_offset = block.offset() as usize;
let block_len =
block.bodyLength() as u64 + block.metaDataLength() as u64;
let block_offset = block.offset() as u64;
block_offset..block_offset + block_len
})
.collect_vec();
Expand Down Expand Up @@ -364,9 +364,9 @@ impl FileOpener for ArrowOpener {
let recordbatch_ranges = recordbatches
.iter()
.map(|block| {
let block_len = block.bodyLength() as usize
+ block.metaDataLength() as usize;
let block_offset = block.offset() as usize;
let block_len =
block.bodyLength() as u64 + block.metaDataLength() as u64;
let block_offset = block.offset() as u64;
block_offset..block_offset + block_len
})
.collect_vec();
Expand Down
8 changes: 4 additions & 4 deletions datafusion/core/src/datasource/physical_plan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1806,7 +1806,7 @@ mod tests {
#[derive(Debug, Clone)]
struct TrackingParquetFileReaderFactory {
inner: Arc<dyn ParquetFileReaderFactory>,
metadata_size_hint_calls: Arc<Mutex<Vec<Option<usize>>>>,
metadata_size_hint_calls: Arc<Mutex<Vec<Option<u64>>>>,
}

impl TrackingParquetFileReaderFactory {
Expand All @@ -1823,7 +1823,7 @@ mod tests {
&self,
partition_index: usize,
file_meta: FileMeta,
metadata_size_hint: Option<usize>,
metadata_size_hint: Option<u64>,
metrics: &ExecutionPlanMetricsSet,
) -> Result<Box<dyn parquet::arrow::async_reader::AsyncFileReader + Send>>
{
Expand Down Expand Up @@ -1856,8 +1856,8 @@ mod tests {
let schema = batch.schema();
let name_1 = "test1.parquet";
let name_2 = "test2.parquet";
let total_size_1 = write_batch(name_1, store.clone(), batch.clone()).await;
let total_size_2 = write_batch(name_2, store.clone(), batch.clone()).await;
let total_size_1 = write_batch(name_1, store.clone(), batch.clone()).await as u64;
let total_size_2 = write_batch(name_2, store.clone(), batch.clone()).await as u64;

let reader_factory =
Arc::new(TrackingParquetFileReaderFactory::new(store.clone()));
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/test/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ pub fn local_unpartitioned_file(path: impl AsRef<std::path::Path>) -> ObjectMeta
ObjectMeta {
location,
last_modified: metadata.modified().map(chrono::DateTime::from).unwrap(),
size: metadata.len() as usize,
size: metadata.len(),
e_tag: None,
version: None,
}
Expand Down Expand Up @@ -166,7 +166,7 @@ impl ObjectStore for BlockingObjectStore {
fn list(
&self,
prefix: Option<&Path>,
) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
self.inner.list(prefix)
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/test_util/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ impl TestParquetFile {

println!("Generated test dataset with {num_rows} rows");

let size = std::fs::metadata(&path)?.len() as usize;
let size = std::fs::metadata(&path)?.len();

let mut canonical_path = path.canonicalize()?;

Expand Down
1 change: 1 addition & 0 deletions datafusion/datasource-avro/src/avro_to_arrow/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ fn default_field_name(dt: &DataType) -> &str {
}
DataType::Decimal128(_, _) => "decimal",
DataType::Decimal256(_, _) => "decimal",
DataType::Extension(_) => unimplemented!("Extension support not implemented"),
}
}

Expand Down
26 changes: 12 additions & 14 deletions datafusion/datasource-parquet/src/file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,13 +193,13 @@ impl ParquetFormat {
/// another read to fetch the metadata length encoded in the footer.
///
/// - If `None`, defaults to value on `config_options`
pub fn with_metadata_size_hint(mut self, size_hint: Option<usize>) -> Self {
pub fn with_metadata_size_hint(mut self, size_hint: Option<u64>) -> Self {
self.options.global.metadata_size_hint = size_hint;
self
}

/// Return the metadata size hint if set
pub fn metadata_size_hint(&self) -> Option<usize> {
pub fn metadata_size_hint(&self) -> Option<u64> {
self.options.global.metadata_size_hint
}

Expand Down Expand Up @@ -290,7 +290,7 @@ fn clear_metadata(
async fn fetch_schema_with_location(
store: &dyn ObjectStore,
file: &ObjectMeta,
metadata_size_hint: Option<usize>,
metadata_size_hint: Option<u64>,
) -> Result<(Path, Schema)> {
let loc_path = file.location.clone();
let schema = fetch_schema(store, file, metadata_size_hint).await?;
Expand Down Expand Up @@ -735,15 +735,13 @@ impl<'a> ObjectStoreFetch<'a> {
}

impl MetadataFetch for ObjectStoreFetch<'_> {
fn fetch(
&mut self,
range: Range<usize>,
) -> BoxFuture<'_, Result<Bytes, ParquetError>> {
async {
fn fetch(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes, ParquetError>> {
async move {
let range_usize: Range<u64> = range.start..range.end;
self.store
.get_range(&self.meta.location, range)
.get_range(&self.meta.location, range_usize)
.await
.map_err(ParquetError::from)
.map_err(|e| ParquetError::External(Box::new(e)))
}
.boxed()
}
Expand All @@ -758,13 +756,13 @@ impl MetadataFetch for ObjectStoreFetch<'_> {
pub async fn fetch_parquet_metadata(
store: &dyn ObjectStore,
meta: &ObjectMeta,
size_hint: Option<usize>,
size_hint: Option<u64>,
) -> Result<ParquetMetaData> {
let file_size = meta.size;
let fetch = ObjectStoreFetch::new(store, meta);

ParquetMetaDataReader::new()
.with_prefetch_hint(size_hint)
.with_prefetch_hint(size_hint.map(|n| n.try_into().unwrap()))
.load_and_finish(fetch, file_size)
.await
.map_err(DataFusionError::from)
Expand All @@ -774,7 +772,7 @@ pub async fn fetch_parquet_metadata(
async fn fetch_schema(
store: &dyn ObjectStore,
file: &ObjectMeta,
metadata_size_hint: Option<usize>,
metadata_size_hint: Option<u64>,
) -> Result<Schema> {
let metadata = fetch_parquet_metadata(store, file, metadata_size_hint).await?;
let file_metadata = metadata.file_metadata();
Expand All @@ -792,7 +790,7 @@ pub async fn fetch_statistics(
store: &dyn ObjectStore,
table_schema: SchemaRef,
file: &ObjectMeta,
metadata_size_hint: Option<usize>,
metadata_size_hint: Option<u64>,
) -> Result<Statistics> {
let metadata = fetch_parquet_metadata(store, file, metadata_size_hint).await?;
statistics_from_parquet_meta_calc(&metadata, table_schema)
Expand Down
Loading
Loading