Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Prepare for DataFusion 45 (bump to DataFusion rev 5592834 + Arrow 54.0.0) #1332

Merged
merged 21 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
201 changes: 104 additions & 97 deletions native/Cargo.lock

Large diffs are not rendered by default.

30 changes: 15 additions & 15 deletions native/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,21 @@ edition = "2021"
rust-version = "1.79"
Copy link
Contributor

@comphead comphead Jan 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this probably can be updated to 1.80 like in datafusion or 1.81, the PR is already created


[workspace.dependencies]
arrow = { version = "53.4.0", features = ["prettyprint", "ffi", "chrono-tz"] }
arrow-array = { version = "53.4.0" }
arrow-buffer = { version = "53.4.0" }
arrow-data = { version = "53.4.0" }
arrow-schema = { version = "53.4.0" }
parquet = { version = "53.4.0", default-features = false, features = ["experimental"] }
datafusion = { version = "44.0.0", default-features = false, features = ["unicode_expressions", "crypto_expressions"] }
datafusion-common = { version = "44.0.0", default-features = false }
datafusion-functions = { version = "44.0.0", default-features = false, features = ["crypto_expressions"] }
datafusion-functions-nested = { version = "44.0.0", default-features = false }
datafusion-expr = { version = "44.0.0", default-features = false }
datafusion-expr-common = { version = "44.0.0", default-features = false }
datafusion-execution = { version = "44.0.0", default-features = false }
datafusion-physical-plan = { version = "44.0.0", default-features = false }
datafusion-physical-expr = { version = "44.0.0", default-features = false }
arrow = { version = "54.0.0", features = ["prettyprint", "ffi", "chrono-tz"] }
arrow-array = { version = "54.0.0" }
arrow-buffer = { version = "54.0.0" }
arrow-data = { version = "54.0.0" }
arrow-schema = { version = "54.0.0" }
parquet = { version = "54.0.0", default-features = false, features = ["experimental"] }
datafusion = { git = "https://github.com/apache/datafusion", rev = "5592834", default-features = false, features = ["unicode_expressions", "crypto_expressions"] }
datafusion-common = { git = "https://github.com/apache/datafusion", rev = "5592834", default-features = false }
datafusion-functions = { git = "https://github.com/apache/datafusion", rev = "5592834", default-features = false, features = ["crypto_expressions"] }
datafusion-functions-nested = { git = "https://github.com/apache/datafusion", rev = "5592834", default-features = false }
datafusion-expr = { git = "https://github.com/apache/datafusion", rev = "5592834", default-features = false }
datafusion-expr-common = { git = "https://github.com/apache/datafusion", rev = "5592834", default-features = false }
datafusion-execution = { git = "https://github.com/apache/datafusion", rev = "5592834", default-features = false }
datafusion-physical-plan = { git = "https://github.com/apache/datafusion", rev = "5592834", default-features = false }
datafusion-physical-expr = { git = "https://github.com/apache/datafusion", rev = "5592834", default-features = false }
datafusion-comet-spark-expr = { path = "spark-expr", version = "0.6.0" }
datafusion-comet-proto = { path = "proto", version = "0.6.0" }
chrono = { version = "0.4", default-features = false, features = ["clock"] }
Expand Down
1 change: 1 addition & 0 deletions native/core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan(
let start = Instant::now();
let planner = PhysicalPlanner::new(Arc::clone(&exec_context.session_ctx))
.with_exec_id(exec_context_id);

let (scans, root_op) = planner.create_plan(
&exec_context.spark_plan,
&mut exec_context.input_sources.clone(),
Expand Down
167 changes: 140 additions & 27 deletions native/core/src/execution/operators/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,25 +32,28 @@ use arrow::record_batch::RecordBatch;
use arrow_array::{make_array, Array, ArrayRef, BooleanArray, RecordBatchOptions};
use arrow_data::transform::MutableArrayData;
use arrow_schema::ArrowError;
use datafusion::physical_plan::common::can_project;
use datafusion::physical_plan::execution_plan::CardinalityEffect;
use datafusion_common::cast::as_boolean_array;
use datafusion_common::stats::Precision;
use datafusion_common::{internal_err, plan_err, DataFusionError, Result};
use datafusion_common::{internal_err, plan_err, project_schema, DataFusionError, Result};
use datafusion_execution::TaskContext;
use datafusion_expr::Operator;
use datafusion_physical_expr::equivalence::ProjectionMapping;
use datafusion_physical_expr::expressions::BinaryExpr;
use datafusion_physical_expr::intervals::utils::check_support;
use datafusion_physical_expr::utils::collect_columns;
use datafusion_physical_expr::{
analyze, split_conjunction, AnalysisContext, ConstExpr, ExprBoundaries, PhysicalExpr,
analyze, split_conjunction, AcrossPartitions, AnalysisContext, ConstExpr, ExprBoundaries,
PhysicalExpr,
};

use futures::stream::{Stream, StreamExt};
use log::trace;

/// This is a copy of DataFusion's FilterExec with one modification to ensure that input
/// batches are never passed through unchanged. The changes are between the comments
/// `BEGIN Comet change` and `END Comet change`.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct FilterExec {
/// The expression to filter on. This expression must evaluate to a boolean value.
predicate: Arc<dyn PhysicalExpr>,
Expand All @@ -62,6 +65,8 @@ pub struct FilterExec {
default_selectivity: u8,
/// Properties equivalence properties, partitioning, etc.
cache: PlanProperties,
/// The projection indices of the columns in the output schema of join
projection: Option<Vec<usize>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this projection a part of migration? if so the migration is quite complicated....

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We currently maintain a copy of DataFusion's FilterExec with one small change, so I copied over that latest to keep in sync and then re-applied the change that we need (for memory safety because of the way we re-use buffers).

}

impl FilterExec {
Expand All @@ -73,13 +78,15 @@ impl FilterExec {
match predicate.data_type(input.schema().as_ref())? {
DataType::Boolean => {
let default_selectivity = 20;
let cache = Self::compute_properties(&input, &predicate, default_selectivity)?;
let cache =
Self::compute_properties(&input, &predicate, default_selectivity, None)?;
Ok(Self {
predicate,
input: Arc::clone(&input),
metrics: ExecutionPlanMetricsSet::new(),
default_selectivity,
cache,
projection: None,
})
}
other => {
Expand All @@ -101,6 +108,35 @@ impl FilterExec {
Ok(self)
}

/// Return new instance of [FilterExec] with the given projection.
pub fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> {
// Check if the projection is valid
can_project(&self.schema(), projection.as_ref())?;

let projection = match projection {
Some(projection) => match &self.projection {
Some(p) => Some(projection.iter().map(|i| p[*i]).collect()),
None => Some(projection),
},
None => None,
};

let cache = Self::compute_properties(
&self.input,
&self.predicate,
self.default_selectivity,
projection.as_ref(),
)?;
Ok(Self {
predicate: Arc::clone(&self.predicate),
input: Arc::clone(&self.input),
metrics: self.metrics.clone(),
default_selectivity: self.default_selectivity,
cache,
projection,
})
}

/// The expression to filter on. This expression must evaluate to a boolean value.
pub fn predicate(&self) -> &Arc<dyn PhysicalExpr> {
&self.predicate
Expand All @@ -116,6 +152,11 @@ impl FilterExec {
self.default_selectivity
}

/// Projection
pub fn projection(&self) -> Option<&Vec<usize>> {
self.projection.as_ref()
}

/// Calculates `Statistics` for `FilterExec`, by applying selectivity (either default, or estimated) to input statistics.
fn statistics_helper(
input: &Arc<dyn ExecutionPlan>,
Expand Down Expand Up @@ -168,11 +209,21 @@ impl FilterExec {
if binary.op() == &Operator::Eq {
// Filter evaluates to single value for all partitions
if input_eqs.is_expr_constant(binary.left()) {
res_constants
.push(ConstExpr::from(binary.right()).with_across_partitions(true))
let (expr, across_parts) = (
binary.right(),
input_eqs.get_expr_constant_value(binary.right()),
);
res_constants.push(
ConstExpr::new(Arc::clone(expr)).with_across_partitions(across_parts),
);
} else if input_eqs.is_expr_constant(binary.right()) {
res_constants
.push(ConstExpr::from(binary.left()).with_across_partitions(true))
let (expr, across_parts) = (
binary.left(),
input_eqs.get_expr_constant_value(binary.left()),
);
res_constants.push(
ConstExpr::new(Arc::clone(expr)).with_across_partitions(across_parts),
);
}
}
}
Expand All @@ -184,6 +235,7 @@ impl FilterExec {
input: &Arc<dyn ExecutionPlan>,
predicate: &Arc<dyn PhysicalExpr>,
default_selectivity: u8,
projection: Option<&Vec<usize>>,
) -> Result<PlanProperties> {
// Combine the equal predicates with the input equivalence properties
// to construct the equivalence properties:
Expand All @@ -199,17 +251,32 @@ impl FilterExec {
.into_iter()
.filter(|column| stats.column_statistics[column.index()].is_singleton())
.map(|column| {
let value = stats.column_statistics[column.index()]
.min_value
.get_value();
let expr = Arc::new(column) as _;
ConstExpr::new(expr).with_across_partitions(true)
ConstExpr::new(expr)
.with_across_partitions(AcrossPartitions::Uniform(value.cloned()))
});
// this is for statistics
// This is for statistics
eq_properties = eq_properties.with_constants(constants);
// this is for logical constant (for example: a = '1', then a could be marked as a constant)
// This is for logical constant (for example: a = '1', then a could be marked as a constant)
// to do: how to deal with multiple situation to represent = (for example c1 between 0 and 0)
eq_properties = eq_properties.with_constants(Self::extend_constants(input, predicate));

let mut output_partitioning = input.output_partitioning().clone();
// If contains projection, update the PlanProperties.
if let Some(projection) = projection {
let schema = eq_properties.schema();
let projection_mapping = ProjectionMapping::from_indices(projection, schema)?;
let out_schema = project_schema(schema, Some(projection))?;
output_partitioning = output_partitioning.project(&projection_mapping, &eq_properties);
eq_properties = eq_properties.project(&projection_mapping, out_schema);
}

Ok(PlanProperties::new(
eq_properties,
input.output_partitioning().clone(), // Output Partitioning
output_partitioning,
input.pipeline_behavior(),
input.boundedness(),
))
Expand All @@ -220,7 +287,27 @@ impl DisplayAs for FilterExec {
fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "FilterExec: {}", self.predicate)
let display_projections = if let Some(projection) = self.projection.as_ref() {
format!(
", projection=[{}]",
projection
.iter()
.map(|index| format!(
"{}@{}",
self.input.schema().fields().get(*index).unwrap().name(),
index
))
.collect::<Vec<_>>()
.join(", ")
)
} else {
"".to_string()
};
write!(
f,
"CometFilterExec: {}{}",
self.predicate, display_projections
)
}
}
}
Expand All @@ -245,7 +332,7 @@ impl ExecutionPlan for FilterExec {
}

fn maintains_input_order(&self) -> Vec<bool> {
// tell optimizer this operator doesn't reorder its input
// Tell optimizer this operator doesn't reorder its input
vec![true]
}

Expand All @@ -258,6 +345,7 @@ impl ExecutionPlan for FilterExec {
let selectivity = e.default_selectivity();
e.with_default_selectivity(selectivity)
})
.and_then(|e| e.with_projection(self.projection().cloned()))
.map(|e| Arc::new(e) as _)
}

Expand All @@ -274,10 +362,11 @@ impl ExecutionPlan for FilterExec {
);
let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
Ok(Box::pin(FilterExecStream {
schema: self.input.schema(),
schema: self.schema(),
predicate: Arc::clone(&self.predicate),
input: self.input.execute(partition, context)?,
baseline_metrics,
projection: self.projection.clone(),
}))
}

Expand All @@ -288,7 +377,13 @@ impl ExecutionPlan for FilterExec {
/// The output statistics of a filtering operation can be estimated if the
/// predicate's selectivity value can be determined for the incoming data.
fn statistics(&self) -> Result<Statistics> {
Self::statistics_helper(&self.input, self.predicate(), self.default_selectivity)
let stats =
Self::statistics_helper(&self.input, self.predicate(), self.default_selectivity)?;
Ok(stats.project(self.projection.as_ref()))
}

fn cardinality_effect(&self) -> CardinalityEffect {
CardinalityEffect::LowerEqual
}
}

Expand Down Expand Up @@ -332,28 +427,41 @@ fn collect_new_statistics(
/// The FilterExec streams wraps the input iterator and applies the predicate expression to
/// determine which rows to include in its output batches
struct FilterExecStream {
/// Output schema, which is the same as the input schema for this operator
/// Output schema after the projection
schema: SchemaRef,
/// The expression to filter on. This expression must evaluate to a boolean value.
predicate: Arc<dyn PhysicalExpr>,
/// The input partition to filter.
input: SendableRecordBatchStream,
/// runtime metrics recording
/// Runtime metrics recording
baseline_metrics: BaselineMetrics,
/// The projection indices of the columns in the input schema
projection: Option<Vec<usize>>,
}

pub(crate) fn batch_filter(
fn filter_and_project(
batch: &RecordBatch,
predicate: &Arc<dyn PhysicalExpr>,
projection: Option<&Vec<usize>>,
output_schema: &SchemaRef,
) -> Result<RecordBatch> {
predicate
.evaluate(batch)
.and_then(|v| v.into_array(batch.num_rows()))
.and_then(|array| {
Ok(match as_boolean_array(&array) {
// apply filter array to record batch
Ok(filter_array) => comet_filter_record_batch(batch, filter_array)?,
Err(_) => {
Ok(match (as_boolean_array(&array), projection) {
// Apply filter array to record batch
(Ok(filter_array), None) => comet_filter_record_batch(batch, filter_array)?,
(Ok(filter_array), Some(projection)) => {
let projected_columns = projection
.iter()
.map(|i| Arc::clone(batch.column(*i)))
.collect();
let projected_batch =
RecordBatch::try_new(Arc::clone(output_schema), projected_columns)?;
Comment on lines +456 to +461
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Normally projection should come after predicate, no?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, you already got predicate filter result filter_array. Then it doesn't matter.

comet_filter_record_batch(&projected_batch, filter_array)?
}
(Err(_), _) => {
return internal_err!("Cannot create filter_array from non-boolean predicates");
}
})
Expand Down Expand Up @@ -397,9 +505,14 @@ impl Stream for FilterExecStream {
match ready!(self.input.poll_next_unpin(cx)) {
Some(Ok(batch)) => {
let timer = self.baseline_metrics.elapsed_compute().timer();
let filtered_batch = batch_filter(&batch, &self.predicate)?;
let filtered_batch = filter_and_project(
&batch,
&self.predicate,
self.projection.as_ref(),
&self.schema,
)?;
timer.done();
// skip entirely filtered batches
// Skip entirely filtered batches
if filtered_batch.num_rows() == 0 {
continue;
}
Expand All @@ -416,7 +529,7 @@ impl Stream for FilterExecStream {
}

fn size_hint(&self) -> (usize, Option<usize>) {
// same number of record batches
// Same number of record batches
self.input.size_hint()
}
}
Expand Down
6 changes: 1 addition & 5 deletions native/core/src/execution/operators/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,11 +304,7 @@ fn scan_schema(input_batch: &InputBatch, data_types: &[DataType]) -> SchemaRef {
.map(|(idx, c)| {
let datatype = ScanExec::unpack_dictionary_type(c.data_type());
// We don't use the field name. Put a placeholder.
if matches!(datatype, DataType::Dictionary(_, _)) {
Field::new_dict(format!("col_{}", idx), datatype, true, idx as i64, false)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is no longer possible to re-use dictionary id across fields. I am unsure of the impact here. Perhaps @viirya will know.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What you mean to re-use dictionary id across fields? Dictionary id should be unique per field.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, corrected it. If two fields have same dictionary, they may use same dictionary id.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw you resolved this. Is it not an issue now?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to do some micro benchmarks on this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no performance impact from this change. The original code stored a dictionary id in the metadata and the new code does not. This dictionary id is actually not used at all in FFI. It was used in Arrow IPC but is no longer used as of Arrow 54.0.0 because that feature is now removed and Arrow IPC manages its own dictionary ids. We do not use Arrow IPC now because we are using our own proprietary encoding.

I will go ahead and run another TPC-H benchmark and post results here today, just to confirm there are no regressions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just making sure, will this work even the enableFastEncoding option is disabled?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fresh benchmark results:

Using fast encoding = 332 seconds (our published time is 331, and I do see small variance on each run)
Using Arrow IPC = 334 seconds

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kazuyukitanimura does this address your concerns?

} else {
Field::new(format!("col_{}", idx), datatype, true)
}
Field::new(format!("col_{}", idx), datatype, true)
})
.collect::<Vec<Field>>()
}
Expand Down
Loading
Loading