Skip to content
58 changes: 45 additions & 13 deletions datafusion/common/src/dfschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,7 @@ impl DFSchema {
}

/// Check to see if fields in 2 Arrow schemas are compatible
#[deprecated(since = "47.0.0", note = "This method is no longer used")]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a replaceable method for this one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we have a similar API already:

logically_equivalent_names_and_types

pub fn check_arrow_schema_type_compatible(
&self,
arrow_schema: &Schema,
Expand Down Expand Up @@ -604,26 +605,57 @@ impl DFSchema {
})
}

/// Returns true if the two schemas have the same qualified named
/// fields with the same data types. Returns false otherwise.
#[deprecated(since = "47.0.0", note = "Use has_equivalent_names_and_types` instead")]
pub fn equivalent_names_and_types(&self, other: &Self) -> bool {
self.has_equivalent_names_and_types(other).is_ok()
}

/// Returns Ok if the two schemas have the same qualified named
/// fields with the compatible data types.
///
/// This is a specialized version of Eq that ignores differences
/// in nullability and metadata.
/// Returns an `Err` with a message otherwise.
///
/// This is a specialized version of Eq that ignores differences in
/// nullability and metadata.
///
/// Use [DFSchema]::logically_equivalent_names_and_types for a weaker
/// logical type checking, which for example would consider a dictionary
/// encoded UTF8 array to be equivalent to a plain UTF8 array.
pub fn equivalent_names_and_types(&self, other: &Self) -> bool {
pub fn has_equivalent_names_and_types(&self, other: &Self) -> Result<()> {
// case 1 : schema length mismatch
if self.fields().len() != other.fields().len() {
return false;
_plan_err!(
"Schema mismatch: the schema length are not same \
Expected schema length: {}, got: {}",
self.fields().len(),
other.fields().len()
)
} else {
// case 2 : schema length match, but fields mismatch
// check if the fields name are the same and have the same data types
self.fields()
.iter()
.zip(other.fields().iter())
.try_for_each(|(f1, f2)| {
if f1.name() != f2.name()
|| (!DFSchema::datatype_is_semantically_equal(
f1.data_type(),
f2.data_type(),
) && !can_cast_types(f2.data_type(), f1.data_type()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhuqi-lucas this addition of "can_cast_types" broken Substrait consumer (since we use this logic to decide if we should cast things to get names to match - this now always passes).

But also overall using can_cast_types here doesn't feel right. Can_cast_types is very lenient. I think what you'd want to do is to allow Utf8 -> Utf8View in logically_equivalent_names_and_types and switch the call site you're interested to use that one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @Blizzara for testing and feedback, i am sorry for the broken Substrait consumer which i was not aware. I will help review when you submit the fix.

@zhuqi-lucas this addition of "can_cast_types" broken Substrait consumer (since we use this logic to decide if we should cast things to get names to match - this now always passes).

But also overall using can_cast_types here doesn't feel right. Can_cast_types is very lenient. I think what you'd want to do is to allow Utf8 -> Utf8View in logically_equivalent_names_and_types and switch the call site you're interested to use that one.

{
_plan_err!(
"Schema mismatch: Expected field '{}' with type {:?}, \
but got '{}' with type {:?}.",
f1.name(),
f1.data_type(),
f2.name(),
f2.data_type()
)
} else {
Ok(())
}
})
}
let self_fields = self.iter();
let other_fields = other.iter();
self_fields.zip(other_fields).all(|((q1, f1), (q2, f2))| {
q1 == q2
&& f1.name() == f2.name()
&& Self::datatype_is_semantically_equal(f1.data_type(), f2.data_type())
})
}

/// Checks if two [`DataType`]s are logically equal. This is a notably weaker constraint
Expand Down
6 changes: 3 additions & 3 deletions datafusion/expr/src/logical_plan/invariants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,11 @@ fn assert_valid_semantic_plan(plan: &LogicalPlan) -> Result<()> {
/// Returns an error if the plan does not have the expected schema.
/// Ignores metadata and nullability.
pub fn assert_expected_schema(schema: &DFSchemaRef, plan: &LogicalPlan) -> Result<()> {
let equivalent = plan.schema().equivalent_names_and_types(schema);
let compatible = plan.schema().has_equivalent_names_and_types(schema);

if !equivalent {
if let Err(e) = compatible {
internal_err!(
"Failed due to a difference in schemas, original schema: {:?}, new schema: {:?}",
"Failed due to a difference in schemas: {e}, original schema: {:?}, new schema: {:?}",
schema,
plan.schema()
)
Expand Down
31 changes: 7 additions & 24 deletions datafusion/optimizer/src/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ fn assert_valid_optimization(
plan: &LogicalPlan,
prev_schema: &Arc<DFSchema>,
) -> Result<()> {
// verify invariant: optimizer passes should not change the schema
// verify invariant: optimizer passes should not change the schema if the schema can't be cast from the previous schema.
// Refer to <https://datafusion.apache.org/contributor-guide/specification/invariants.html#logical-schema-is-invariant-under-logical-optimization>
assert_expected_schema(prev_schema, plan)?;

Expand All @@ -459,7 +459,9 @@ mod tests {
use std::sync::{Arc, Mutex};

use datafusion_common::tree_node::Transformed;
use datafusion_common::{plan_err, DFSchema, DFSchemaRef, DataFusionError, Result};
use datafusion_common::{
assert_contains, plan_err, DFSchema, DFSchemaRef, DataFusionError, Result,
};
use datafusion_expr::logical_plan::EmptyRelation;
use datafusion_expr::{col, lit, LogicalPlan, LogicalPlanBuilder, Projection};

Expand Down Expand Up @@ -505,28 +507,9 @@ mod tests {
schema: Arc::new(DFSchema::empty()),
});
let err = opt.optimize(plan, &config, &observe).unwrap_err();
assert!(err.strip_backtrace().starts_with(
"Optimizer rule 'get table_scan rule' failed\n\
caused by\n\
Check optimizer-specific invariants after optimizer rule: get table_scan rule\n\
caused by\n\
Internal error: Failed due to a difference in schemas, \
original schema: DFSchema { inner: Schema { \
fields: [], \
metadata: {} }, \
field_qualifiers: [], \
functional_dependencies: FunctionalDependencies { deps: [] } \
}, \
new schema: DFSchema { inner: Schema { \
fields: [\
Field { name: \"a\", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, \
Field { name: \"b\", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, \
Field { name: \"c\", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }\
], \
metadata: {} }, \
field_qualifiers: [Some(Bare { table: \"test\" }), Some(Bare { table: \"test\" }), Some(Bare { table: \"test\" })], \
functional_dependencies: FunctionalDependencies { deps: [] } }",
));

// Simplify assert to check the error message contains the expected message, which is only the schema length mismatch
assert_contains!(err.strip_backtrace(), "Schema mismatch: the schema length are not same Expected schema length: 3, got: 0");
}

#[test]
Expand Down
8 changes: 7 additions & 1 deletion datafusion/sqllogictest/test_files/expr.slt
Original file line number Diff line number Diff line change
Expand Up @@ -478,8 +478,14 @@ a
statement ok
create table foo (a varchar, b varchar) as values ('a', 'b');


query T
SELECT concat_ws('', a, b,'c') from foo
----
abc

query T
SELECT concat_ws('',a,b,'c') from foo
SELECT concat_ws('',arrow_cast(a, 'Utf8View'),arrow_cast(b, 'Utf8View'),'c') from foo
----
abc

Expand Down
2 changes: 1 addition & 1 deletion datafusion/substrait/src/logical_plan/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,7 @@ pub async fn from_substrait_plan_with_consumer(
return Ok(plan);
}
let renamed_schema = make_renamed_schema(plan.schema(), &root.names)?;
if renamed_schema.equivalent_names_and_types(plan.schema()) {
if renamed_schema.has_equivalent_names_and_types(plan.schema()).is_ok() {
// Nothing to do if the schema is already equivalent
return Ok(plan);
}
Expand Down