Skip to content

Commit 90aa47e

Browse files
authored
Support Substrait Round-Trip of EmptyRelation Including produce_one_row Semantics (#18842)
## Which issue does this PR close? * Closes #16271. ## Rationale for this change This change resolves a long‑standing gap in DataFusion’s Substrait round‑trip implementation where `EmptyRelation` with `produce_one_row=true` could not be encoded or decoded. This limitation caused ~1800 sqllogictest cases to fail, particularly those involving queries without a FROM clause (e.g., `SELECT 1`). By adding full support for recognizing and producing the Substrait VirtualTable pattern representing a “phantom row,” DataFusion can now faithfully round‑trip logical plans that use empty relations to provide scalar evaluation contexts. This unblocks broader Substrait compatibility and improves consistency across logical plan conversions. ## What changes are included in this PR? * Implement detection of the Substrait VirtualTable patterns representing `produce_one_row` and map them to `LogicalPlan::EmptyRelation`. * Add `from_empty_relation` encoding logic that emits a properly structured VirtualTable, including default literal values when `produce_one_row=true`. * Refactor literal row conversion into a helper (`convert_literal_rows`) for clarity and reuse. * Improve field‑count validation for expression‑based VirtualTables. * Add comprehensive round‑trip test coverage: * SELECT without FROM * Mixed‑type EmptyRelation with phantom row * EmptyRelation with zero rows * Subqueries involving EmptyRelation ## Are these changes tested? Yes. New integration tests exercise all permutations of EmptyRelation encoding and decoding, including edge cases related to schema handling and subqueries. These tests ensure round‑trip correctness and prevent regressions. ## Are there any user-facing changes? No user‑facing API changes. This PR improves Substrait interoperability internally. ## LLM-generated code disclosure This PR includes LLM‑generated code and comments. All generated content has been manually reviewed and tested.
1 parent 434a23b commit 90aa47e

File tree

3 files changed

+160
-47
lines changed

3 files changed

+160
-47
lines changed

datafusion/substrait/src/logical_plan/consumer/rel/read_rel.rs

Lines changed: 71 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -121,57 +121,53 @@ pub async fn from_read_rel(
121121
}));
122122
}
123123

124+
// Check for produce_one_row pattern in both old (values) and new (expressions) formats.
125+
// A VirtualTable with exactly one row containing only empty/default fields represents
126+
// an EmptyRelation with produce_one_row=true. This pattern is used for queries without
127+
// a FROM clause (e.g., "SELECT 1 AS one") where a single phantom row is needed to
128+
// provide a context for evaluating scalar expressions. This is conceptually similar to
129+
// the SQL "DUAL" table (see: https://en.wikipedia.org/wiki/DUAL_table) which some
130+
// databases provide as a single-row source for selecting constant expressions when no
131+
// real table is present.
132+
let is_produce_one_row = (vt.values.len() == 1
133+
&& vt.expressions.is_empty()
134+
&& substrait_schema.fields().is_empty()
135+
&& vt.values[0].fields.is_empty())
136+
|| (vt.expressions.len() == 1
137+
&& vt.values.is_empty()
138+
&& substrait_schema.fields().is_empty()
139+
&& vt.expressions[0].fields.is_empty());
140+
141+
if is_produce_one_row {
142+
return Ok(LogicalPlan::EmptyRelation(EmptyRelation {
143+
produce_one_row: true,
144+
schema: DFSchemaRef::new(substrait_schema),
145+
}));
146+
}
147+
124148
let values = if !vt.expressions.is_empty() {
125149
let mut exprs = vec![];
126150
for row in &vt.expressions {
127-
let mut name_idx = 0;
128151
let mut row_exprs = vec![];
129152
for expression in &row.fields {
130-
name_idx += 1;
131153
let expr = consumer
132-
.consume_expression(expression, &DFSchema::empty())
154+
.consume_expression(expression, &substrait_schema)
133155
.await?;
134156
row_exprs.push(expr);
135157
}
136-
if name_idx != named_struct.names.len() {
158+
// For expressions, validate against top-level schema fields, not nested names
159+
if row_exprs.len() != substrait_schema.fields().len() {
137160
return substrait_err!(
138-
"Names list must match exactly to nested schema, but found {} uses for {} names",
139-
name_idx,
140-
named_struct.names.len()
161+
"Field count mismatch: expected {} fields but found {} in virtual table row",
162+
substrait_schema.fields().len(),
163+
row_exprs.len()
141164
);
142165
}
143166
exprs.push(row_exprs);
144167
}
145168
exprs
146169
} else {
147-
vt
148-
.values
149-
.iter()
150-
.map(|row| {
151-
let mut name_idx = 0;
152-
let lits = row
153-
.fields
154-
.iter()
155-
.map(|lit| {
156-
name_idx += 1; // top-level names are provided through schema
157-
Ok(Expr::Literal(from_substrait_literal(
158-
consumer,
159-
lit,
160-
&named_struct.names,
161-
&mut name_idx,
162-
)?, None))
163-
})
164-
.collect::<datafusion::common::Result<_>>()?;
165-
if name_idx != named_struct.names.len() {
166-
return substrait_err!(
167-
"Names list must match exactly to nested schema, but found {} uses for {} names",
168-
name_idx,
169-
named_struct.names.len()
170-
);
171-
}
172-
Ok(lits)
173-
})
174-
.collect::<datafusion::common::Result<_>>()?
170+
convert_literal_rows(consumer, vt, named_struct)?
175171
};
176172

177173
Ok(LogicalPlan::Values(Values {
@@ -226,6 +222,46 @@ pub async fn from_read_rel(
226222
}
227223
}
228224

225+
/// Converts Substrait literal rows from a VirtualTable into DataFusion expressions.
226+
///
227+
/// This function processes the deprecated `values` field of VirtualTable, converting
228+
/// each literal value into a `Expr::Literal` while tracking and validating the name
229+
/// indices against the provided named struct schema.
230+
fn convert_literal_rows(
231+
consumer: &impl SubstraitConsumer,
232+
vt: &substrait::proto::read_rel::VirtualTable,
233+
named_struct: &substrait::proto::NamedStruct,
234+
) -> datafusion::common::Result<Vec<Vec<Expr>>> {
235+
#[allow(deprecated)]
236+
vt.values
237+
.iter()
238+
.map(|row| {
239+
let mut name_idx = 0;
240+
let lits = row
241+
.fields
242+
.iter()
243+
.map(|lit| {
244+
name_idx += 1; // top-level names are provided through schema
245+
Ok(Expr::Literal(from_substrait_literal(
246+
consumer,
247+
lit,
248+
&named_struct.names,
249+
&mut name_idx,
250+
)?, None))
251+
})
252+
.collect::<datafusion::common::Result<_>>()?;
253+
if name_idx != named_struct.names.len() {
254+
return substrait_err!(
255+
"Names list must match exactly to nested schema, but found {} uses for {} names",
256+
name_idx,
257+
named_struct.names.len()
258+
);
259+
}
260+
Ok(lits)
261+
})
262+
.collect::<datafusion::common::Result<_>>()
263+
}
264+
229265
pub fn apply_masking(
230266
schema: DFSchema,
231267
mask_expression: &::core::option::Option<MaskExpression>,

datafusion/substrait/src/logical_plan/producer/rel/read_rel.rs

Lines changed: 47 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,10 @@
1818
use crate::logical_plan::producer::{
1919
to_substrait_literal, to_substrait_named_struct, SubstraitProducer,
2020
};
21-
use datafusion::common::{not_impl_err, substrait_datafusion_err, DFSchema, ToDFSchema};
21+
use datafusion::common::{substrait_datafusion_err, DFSchema, ToDFSchema};
2222
use datafusion::logical_expr::utils::conjunction;
2323
use datafusion::logical_expr::{EmptyRelation, Expr, TableScan, Values};
24+
use datafusion::scalar::ScalarValue;
2425
use std::sync::Arc;
2526
use substrait::proto::expression::literal::Struct as LiteralStruct;
2627
use substrait::proto::expression::mask_expression::{StructItem, StructSelect};
@@ -146,26 +147,61 @@ pub fn from_table_scan(
146147
}))
147148
}
148149

150+
/// Encodes an EmptyRelation as a Substrait VirtualTable.
151+
///
152+
/// EmptyRelation represents a relation with no input data. When `produce_one_row` is true,
153+
/// it generates a single row with all fields set to their default values (typically NULL).
154+
/// This is used for queries without a FROM clause, such as "SELECT 1 AS one" or
155+
/// "SELECT current_timestamp()".
156+
///
157+
/// When `produce_one_row` is false, it represents a truly empty relation with no rows,
158+
/// used in optimizations or as a placeholder.
149159
pub fn from_empty_relation(
150160
producer: &mut impl SubstraitProducer,
151161
e: &EmptyRelation,
152162
) -> datafusion::common::Result<Box<Rel>> {
153-
if e.produce_one_row {
154-
return not_impl_err!("Producing a row from empty relation is unsupported");
155-
}
156-
#[allow(deprecated)]
163+
let base_schema = to_substrait_named_struct(producer, &e.schema)?;
164+
165+
let read_type = if e.produce_one_row {
166+
// Create one row with default scalar values for each field in the schema.
167+
// For example, an Int32 field gets Int32(NULL), a Utf8 field gets Utf8(NULL), etc.
168+
// This represents the "phantom row" that provides a context for evaluating
169+
// scalar expressions in queries without a FROM clause.
170+
let fields = e
171+
.schema
172+
.fields()
173+
.iter()
174+
.map(|f| {
175+
let scalar = ScalarValue::try_from(f.data_type())?;
176+
to_substrait_literal(producer, &scalar)
177+
})
178+
.collect::<datafusion::common::Result<_>>()?;
179+
180+
ReadType::VirtualTable(VirtualTable {
181+
// Use deprecated 'values' field instead of 'expressions' because the consumer's
182+
// nested expression support (RexType::Nested) is not yet implemented.
183+
// The 'values' field uses literal::Struct which the consumer can properly
184+
// deserialize with field name preservation.
185+
#[allow(deprecated)]
186+
values: vec![LiteralStruct { fields }],
187+
expressions: vec![],
188+
})
189+
} else {
190+
ReadType::VirtualTable(VirtualTable {
191+
#[allow(deprecated)]
192+
values: vec![],
193+
expressions: vec![],
194+
})
195+
};
157196
Ok(Box::new(Rel {
158197
rel_type: Some(RelType::Read(Box::new(ReadRel {
159198
common: None,
160-
base_schema: Some(to_substrait_named_struct(producer, &e.schema)?),
199+
base_schema: Some(base_schema),
161200
filter: None,
162201
best_effort_filter: None,
163202
projection: None,
164203
advanced_extension: None,
165-
read_type: Some(ReadType::VirtualTable(VirtualTable {
166-
values: vec![],
167-
expressions: vec![],
168-
})),
204+
read_type: Some(read_type),
169205
}))),
170206
}))
171207
}
@@ -203,6 +239,7 @@ pub fn from_values(
203239
projection: None,
204240
advanced_extension: None,
205241
read_type: Some(ReadType::VirtualTable(VirtualTable {
242+
#[allow(deprecated)]
206243
values,
207244
expressions,
208245
})),

datafusion/substrait/tests/cases/roundtrip_logical_plan.rs

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@ use datafusion::execution::registry::SerializerRegistry;
3535
use datafusion::execution::runtime_env::RuntimeEnv;
3636
use datafusion::execution::session_state::SessionStateBuilder;
3737
use datafusion::logical_expr::{
38-
Extension, InvariantLevel, LogicalPlan, PartitionEvaluator, Repartition,
39-
UserDefinedLogicalNode, Values, Volatility,
38+
EmptyRelation, Extension, InvariantLevel, LogicalPlan, PartitionEvaluator,
39+
Repartition, UserDefinedLogicalNode, Values, Volatility,
4040
};
4141
use datafusion::optimizer::simplify_expressions::expr_simplifier::THRESHOLD_INLINE_INLIST;
4242
use datafusion::prelude::*;
@@ -188,6 +188,46 @@ async fn simple_select() -> Result<()> {
188188
roundtrip("SELECT a, b FROM data").await
189189
}
190190

191+
#[tokio::test]
192+
async fn roundtrip_literal_without_from() -> Result<()> {
193+
roundtrip("SELECT 1 AS one").await
194+
}
195+
196+
#[tokio::test]
197+
async fn roundtrip_empty_relation_with_schema() -> Result<()> {
198+
// Test produce_one_row=true with multiple typed columns
199+
roundtrip("SELECT 1::int as a, 'hello'::text as b, 3.14::double as c").await
200+
}
201+
202+
#[tokio::test]
203+
async fn roundtrip_empty_relation_no_rows() -> Result<()> {
204+
// Test produce_one_row=false
205+
let ctx = create_context().await?;
206+
let plan = LogicalPlan::EmptyRelation(EmptyRelation {
207+
produce_one_row: false,
208+
schema: DFSchemaRef::new(DFSchema::empty()),
209+
});
210+
roundtrip_logical_plan_with_ctx(plan, ctx).await?;
211+
Ok(())
212+
}
213+
214+
#[tokio::test]
215+
async fn roundtrip_subquery_with_empty_relation() -> Result<()> {
216+
// Test EmptyRelation in the context of scalar subqueries.
217+
// The optimizer may simplify the subquery away, but we're testing that
218+
// the EmptyRelation round-trips correctly when it appears in the plan.
219+
let ctx = create_context().await?;
220+
let df = ctx.sql("SELECT (SELECT 1) as nested").await?;
221+
let plan = df.into_optimized_plan()?;
222+
223+
// Just verify the round-trip succeeds and produces valid results
224+
let proto = to_substrait_plan(&plan, &ctx.state())?;
225+
let plan2 = from_substrait_plan(&ctx.state(), &proto).await?;
226+
let df2 = DataFrame::new(ctx.state(), plan2);
227+
df2.show().await?;
228+
Ok(())
229+
}
230+
191231
#[tokio::test]
192232
async fn wildcard_select() -> Result<()> {
193233
let plan = generate_plan_from_sql("SELECT * FROM data", true, false).await?;

0 commit comments

Comments
 (0)