Skip to content

Commit

Permalink
Add statistics_truncate_length parquet writer config (#14782)
Browse files Browse the repository at this point in the history
* Add  parquet writer config

* test fixes

---------

Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
akoshchiy and alamb authored Feb 25, 2025
1 parent 2d57a0b commit ce14fbc
Show file tree
Hide file tree
Showing 11 changed files with 71 additions and 0 deletions.
4 changes: 4 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,10 @@ config_namespace! {
/// (writing) Sets column index truncate length
pub column_index_truncate_length: Option<usize>, default = Some(64)

/// (writing) Sets statictics truncate length. If NULL, uses
/// default parquet writer setting
pub statistics_truncate_length: Option<usize>, default = None

/// (writing) Sets best effort maximum number of rows in data page
pub data_page_row_count_limit: usize, default = 20_000

Expand Down
4 changes: 4 additions & 0 deletions datafusion/common/src/file_options/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ impl ParquetOptions {
max_row_group_size,
created_by,
column_index_truncate_length,
statistics_truncate_length,
data_page_row_count_limit,
encoding,
bloom_filter_on_write,
Expand Down Expand Up @@ -255,6 +256,7 @@ impl ParquetOptions {
.set_max_row_group_size(*max_row_group_size)
.set_created_by(created_by.clone())
.set_column_index_truncate_length(*column_index_truncate_length)
.set_statistics_truncate_length(*statistics_truncate_length)
.set_data_page_row_count_limit(*data_page_row_count_limit)
.set_bloom_filter_enabled(*bloom_filter_on_write);

Expand Down Expand Up @@ -491,6 +493,7 @@ mod tests {
max_row_group_size: 42,
created_by: "wordy".into(),
column_index_truncate_length: Some(42),
statistics_truncate_length: Some(42),
data_page_row_count_limit: 42,
encoding: Some("BYTE_STREAM_SPLIT".into()),
bloom_filter_on_write: !defaults.bloom_filter_on_write,
Expand Down Expand Up @@ -587,6 +590,7 @@ mod tests {
max_row_group_size: props.max_row_group_size(),
created_by: props.created_by().to_string(),
column_index_truncate_length: props.column_index_truncate_length(),
statistics_truncate_length: props.statistics_truncate_length(),
data_page_row_count_limit: props.data_page_row_count_limit(),

// global options which set the default column props
Expand Down
4 changes: 4 additions & 0 deletions datafusion/proto-common/proto/datafusion_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,10 @@ message ParquetOptions {
uint64 column_index_truncate_length = 17;
}

oneof statistics_truncate_length_opt {
uint64 statistics_truncate_length = 31;
}

oneof encoding_opt {
string encoding = 19;
}
Expand Down
6 changes: 6 additions & 0 deletions datafusion/proto-common/src/from_proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -952,6 +952,12 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions {
protobuf::parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(v) => Some(*v as usize),
})
.unwrap_or(None),
statistics_truncate_length: value
.statistics_truncate_length_opt.as_ref()
.map(|opt| match opt {
protobuf::parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(v) => Some(*v as usize),
})
.unwrap_or(None),
data_page_row_count_limit: value.data_page_row_count_limit as usize,
encoding: value
.encoding_opt.clone()
Expand Down
25 changes: 25 additions & 0 deletions datafusion/proto-common/src/generated/pbjson.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3133,6 +3133,7 @@ impl serde::Serialize for Field {
}
}
impl<'de> serde::Deserialize<'de> for Field {
#[allow(deprecated)]
fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
Expand Down Expand Up @@ -4968,6 +4969,9 @@ impl serde::Serialize for ParquetOptions {
if self.column_index_truncate_length_opt.is_some() {
len += 1;
}
if self.statistics_truncate_length_opt.is_some() {
len += 1;
}
if self.encoding_opt.is_some() {
len += 1;
}
Expand Down Expand Up @@ -5100,6 +5104,15 @@ impl serde::Serialize for ParquetOptions {
}
}
}
if let Some(v) = self.statistics_truncate_length_opt.as_ref() {
match v {
parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(v) => {
#[allow(clippy::needless_borrow)]
#[allow(clippy::needless_borrows_for_generic_args)]
struct_ser.serialize_field("statisticsTruncateLength", ToString::to_string(&v).as_str())?;
}
}
}
if let Some(v) = self.encoding_opt.as_ref() {
match v {
parquet_options::EncodingOpt::Encoding(v) => {
Expand Down Expand Up @@ -5183,6 +5196,8 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
"maxStatisticsSize",
"column_index_truncate_length",
"columnIndexTruncateLength",
"statistics_truncate_length",
"statisticsTruncateLength",
"encoding",
"bloom_filter_fpp",
"bloomFilterFpp",
Expand Down Expand Up @@ -5218,6 +5233,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
StatisticsEnabled,
MaxStatisticsSize,
ColumnIndexTruncateLength,
StatisticsTruncateLength,
Encoding,
BloomFilterFpp,
BloomFilterNdv,
Expand Down Expand Up @@ -5268,6 +5284,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
"statisticsEnabled" | "statistics_enabled" => Ok(GeneratedField::StatisticsEnabled),
"maxStatisticsSize" | "max_statistics_size" => Ok(GeneratedField::MaxStatisticsSize),
"columnIndexTruncateLength" | "column_index_truncate_length" => Ok(GeneratedField::ColumnIndexTruncateLength),
"statisticsTruncateLength" | "statistics_truncate_length" => Ok(GeneratedField::StatisticsTruncateLength),
"encoding" => Ok(GeneratedField::Encoding),
"bloomFilterFpp" | "bloom_filter_fpp" => Ok(GeneratedField::BloomFilterFpp),
"bloomFilterNdv" | "bloom_filter_ndv" => Ok(GeneratedField::BloomFilterNdv),
Expand Down Expand Up @@ -5316,6 +5333,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
let mut statistics_enabled_opt__ = None;
let mut max_statistics_size_opt__ = None;
let mut column_index_truncate_length_opt__ = None;
let mut statistics_truncate_length_opt__ = None;
let mut encoding_opt__ = None;
let mut bloom_filter_fpp_opt__ = None;
let mut bloom_filter_ndv_opt__ = None;
Expand Down Expand Up @@ -5491,6 +5509,12 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
}
column_index_truncate_length_opt__ = map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(x.0));
}
GeneratedField::StatisticsTruncateLength => {
if statistics_truncate_length_opt__.is_some() {
return Err(serde::de::Error::duplicate_field("statisticsTruncateLength"));
}
statistics_truncate_length_opt__ = map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(x.0));
}
GeneratedField::Encoding => {
if encoding_opt__.is_some() {
return Err(serde::de::Error::duplicate_field("encoding"));
Expand Down Expand Up @@ -5538,6 +5562,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
statistics_enabled_opt: statistics_enabled_opt__,
max_statistics_size_opt: max_statistics_size_opt__,
column_index_truncate_length_opt: column_index_truncate_length_opt__,
statistics_truncate_length_opt: statistics_truncate_length_opt__,
encoding_opt: encoding_opt__,
bloom_filter_fpp_opt: bloom_filter_fpp_opt__,
bloom_filter_ndv_opt: bloom_filter_ndv_opt__,
Expand Down
9 changes: 9 additions & 0 deletions datafusion/proto-common/src/generated/prost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 +794,10 @@ pub struct ParquetOptions {
pub column_index_truncate_length_opt: ::core::option::Option<
parquet_options::ColumnIndexTruncateLengthOpt,
>,
#[prost(oneof = "parquet_options::StatisticsTruncateLengthOpt", tags = "31")]
pub statistics_truncate_length_opt: ::core::option::Option<
parquet_options::StatisticsTruncateLengthOpt,
>,
#[prost(oneof = "parquet_options::EncodingOpt", tags = "19")]
pub encoding_opt: ::core::option::Option<parquet_options::EncodingOpt>,
#[prost(oneof = "parquet_options::BloomFilterFppOpt", tags = "21")]
Expand Down Expand Up @@ -833,6 +837,11 @@ pub mod parquet_options {
#[prost(uint64, tag = "17")]
ColumnIndexTruncateLength(u64),
}
#[derive(Clone, Copy, PartialEq, ::prost::Oneof)]
pub enum StatisticsTruncateLengthOpt {
#[prost(uint64, tag = "31")]
StatisticsTruncateLength(u64),
}
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum EncodingOpt {
#[prost(string, tag = "19")]
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto-common/src/to_proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -823,6 +823,7 @@ impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions {
max_row_group_size: value.max_row_group_size as u64,
created_by: value.created_by.clone(),
column_index_truncate_length_opt: value.column_index_truncate_length.map(|v| protobuf::parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(v as u64)),
statistics_truncate_length_opt: value.statistics_truncate_length.map(|v| protobuf::parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(v as u64)),
data_page_row_count_limit: value.data_page_row_count_limit as u64,
encoding_opt: value.encoding.clone().map(protobuf::parquet_options::EncodingOpt::Encoding),
bloom_filter_on_read: value.bloom_filter_on_read,
Expand Down
9 changes: 9 additions & 0 deletions datafusion/proto/src/generated/datafusion_proto_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 +794,10 @@ pub struct ParquetOptions {
pub column_index_truncate_length_opt: ::core::option::Option<
parquet_options::ColumnIndexTruncateLengthOpt,
>,
#[prost(oneof = "parquet_options::StatisticsTruncateLengthOpt", tags = "31")]
pub statistics_truncate_length_opt: ::core::option::Option<
parquet_options::StatisticsTruncateLengthOpt,
>,
#[prost(oneof = "parquet_options::EncodingOpt", tags = "19")]
pub encoding_opt: ::core::option::Option<parquet_options::EncodingOpt>,
#[prost(oneof = "parquet_options::BloomFilterFppOpt", tags = "21")]
Expand Down Expand Up @@ -833,6 +837,11 @@ pub mod parquet_options {
#[prost(uint64, tag = "17")]
ColumnIndexTruncateLength(u64),
}
#[derive(Clone, Copy, PartialEq, ::prost::Oneof)]
pub enum StatisticsTruncateLengthOpt {
#[prost(uint64, tag = "31")]
StatisticsTruncateLength(u64),
}
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum EncodingOpt {
#[prost(string, tag = "19")]
Expand Down
6 changes: 6 additions & 0 deletions datafusion/proto/src/logical_plan/file_formats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,9 @@ impl TableParquetOptionsProto {
column_index_truncate_length_opt: global_options.global.column_index_truncate_length.map(|length| {
parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(length as u64)
}),
statistics_truncate_length_opt: global_options.global.statistics_truncate_length.map(|length| {
parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(length as u64)
}),
data_page_row_count_limit: global_options.global.data_page_row_count_limit as u64,
encoding_opt: global_options.global.encoding.map(|encoding| {
parquet_options::EncodingOpt::Encoding(encoding)
Expand Down Expand Up @@ -487,6 +490,9 @@ impl From<&ParquetOptionsProto> for ParquetOptions {
column_index_truncate_length: proto.column_index_truncate_length_opt.as_ref().map(|opt| match opt {
parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(length) => *length as usize,
}),
statistics_truncate_length: proto.statistics_truncate_length_opt.as_ref().map(|opt| match opt {
parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(length) => *length as usize,
}),
data_page_row_count_limit: proto.data_page_row_count_limit as usize,
encoding: proto.encoding_opt.as_ref().map(|opt| match opt {
parquet_options::EncodingOpt::Encoding(encoding) => encoding.clone(),
Expand Down
2 changes: 2 additions & 0 deletions datafusion/sqllogictest/test_files/information_schema.slt
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ datafusion.execution.parquet.schema_force_view_types true
datafusion.execution.parquet.skip_arrow_metadata false
datafusion.execution.parquet.skip_metadata true
datafusion.execution.parquet.statistics_enabled page
datafusion.execution.parquet.statistics_truncate_length NULL
datafusion.execution.parquet.write_batch_size 1024
datafusion.execution.parquet.writer_version 1.0
datafusion.execution.planning_concurrency 13
Expand Down Expand Up @@ -313,6 +314,7 @@ datafusion.execution.parquet.schema_force_view_types true (reading) If true, par
datafusion.execution.parquet.skip_arrow_metadata false (writing) Skip encoding the embedded arrow metadata in the KV_meta This is analogous to the `ArrowWriterOptions::with_skip_arrow_metadata`. Refer to <https://docs.rs/parquet/53.3.0/parquet/arrow/arrow_writer/struct.ArrowWriterOptions.html#method.with_skip_arrow_metadata>
datafusion.execution.parquet.skip_metadata true (reading) If true, the parquet reader skip the optional embedded metadata that may be in the file Schema. This setting can help avoid schema conflicts when querying multiple parquet files with schemas containing compatible types but different metadata
datafusion.execution.parquet.statistics_enabled page (writing) Sets if statistics are enabled for any column Valid values are: "none", "chunk", and "page" These values are not case sensitive. If NULL, uses default parquet writer setting
datafusion.execution.parquet.statistics_truncate_length NULL (writing) Sets statictics truncate length. If NULL, uses default parquet writer setting
datafusion.execution.parquet.write_batch_size 1024 (writing) Sets write_batch_size in bytes
datafusion.execution.parquet.writer_version 1.0 (writing) Sets parquet writer version valid values are "1.0" and "2.0"
datafusion.execution.planning_concurrency 13 Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system
Expand Down
1 change: 1 addition & 0 deletions docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus
| datafusion.execution.parquet.max_row_group_size | 1048576 | (writing) Target maximum number of rows in each row group (defaults to 1M rows). Writing larger row groups requires more memory to write, but can get better compression and be faster to read. |
| datafusion.execution.parquet.created_by | datafusion version 45.0.0 | (writing) Sets "created by" property |
| datafusion.execution.parquet.column_index_truncate_length | 64 | (writing) Sets column index truncate length |
| datafusion.execution.parquet.statistics_truncate_length | NULL | (writing) Sets statictics truncate length. If NULL, uses default parquet writer setting |
| datafusion.execution.parquet.data_page_row_count_limit | 20000 | (writing) Sets best effort maximum number of rows in data page |
| datafusion.execution.parquet.encoding | NULL | (writing) Sets default encoding for any column. Valid values are: plain, plain_dictionary, rle, bit_packed, delta_binary_packed, delta_length_byte_array, delta_byte_array, rle_dictionary, and byte_stream_split. These values are not case sensitive. If NULL, uses default parquet writer setting |
| datafusion.execution.parquet.bloom_filter_on_read | true | (writing) Use any available bloom filters when reading parquet files |
Expand Down

0 comments on commit ce14fbc

Please sign in to comment.