Skip to content

Upgrade datafusion and datafusion-federation #324

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
718 changes: 502 additions & 216 deletions Cargo.lock

Large diffs are not rendered by default.

26 changes: 13 additions & 13 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,21 @@ license = "Apache-2.0"
description = "Extend the capabilities of DataFusion to support additional data sources via implementations of the `TableProvider` trait."

[workspace.dependencies]
arrow = "54.2.1"
arrow-array = { version = "54.2.1" }
arrow-flight = { version = "54.2.1", features = [
arrow = "55.0.0"
arrow-array = { version = "55.0.0" }
arrow-flight = { version = "55.0.0", features = [
"flight-sql-experimental",
"tls",
] }
arrow-schema = { version = "54.2.1", features = ["serde"] }
arrow-json = "54.2.1"
arrow-odbc = { version = "16" }
datafusion = { version = "46", default-features = false }
datafusion-expr = { version = "46" }
datafusion-federation = { version = "=0.3.7" }
datafusion-ffi = { version = "46" }
datafusion-proto = { version = "46" }
datafusion-physical-expr = { version = "46" }
datafusion-physical-plan = { version = "46" }
arrow-schema = { version = "55.0.0", features = ["serde"] }
arrow-json = "55.0.0"
arrow-odbc = { version = "16.0.1" }
datafusion = { version = "47", default-features = false }
datafusion-expr = { version = "47" }
datafusion-federation = { version = "=0.4.2" }
datafusion-ffi = { version = "47" }
datafusion-proto = { version = "47" }
datafusion-physical-expr = { version = "47" }
datafusion-physical-plan = { version = "47" }
datafusion-table-providers = { path = "core" }
duckdb = { version = "=1.2.1", package = "spiceai_duckdb_fork" } # Forked to add support for duckdb_scan_arrow, pending: https://github.com/duckdb/duckdb-rs/pull/488
4 changes: 2 additions & 2 deletions core/src/duckdb/creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -731,13 +731,13 @@ pub(crate) mod tests {
dbconnection::duckdbconn::DuckDbConnection, duckdbpool::DuckDbConnectionPool,
},
};
use datafusion::arrow::array::RecordBatch;
use datafusion::{arrow::array::RecordBatch, datasource::sink::DataSink};
use datafusion::{
common::SchemaExt,
execution::{SendableRecordBatchStream, TaskContext},
logical_expr::dml::InsertOp,
parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder,
physical_plan::{insert::DataSink, memory::MemoryStream},
physical_plan::memory::MemoryStream,
};
use tracing::subscriber::DefaultGuard;
use tracing_subscriber::EnvFilter;
Expand Down
10 changes: 6 additions & 4 deletions core/src/duckdb/federation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ use crate::sql::db_connection_pool::dbconnection::{get_schema, Error as DbError}
use crate::sql::sql_provider_datafusion::{get_stream, to_execution_error};
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::sql::unparser::dialect::Dialect;
use datafusion_federation::sql::{SQLExecutor, SQLFederationProvider, SQLTableSource};
use datafusion_federation::sql::{
RemoteTableRef, SQLExecutor, SQLFederationProvider, SQLTableSource,
};
use datafusion_federation::{FederatedTableProviderAdaptor, FederatedTableSource};
use futures::TryStreamExt;
use snafu::ResultExt;
Expand All @@ -23,14 +25,14 @@ impl<T, P> DuckDBTable<T, P> {
fn create_federated_table_source(
self: Arc<Self>,
) -> DataFusionResult<Arc<dyn FederatedTableSource>> {
let table_name = self.base_table.table_reference.to_quoted_string();
let table_reference = self.base_table.table_reference.clone();
let schema = Arc::clone(&Arc::clone(&self).base_table.schema());
let fed_provider = Arc::new(SQLFederationProvider::new(self));
Ok(Arc::new(SQLTableSource::new_with_schema(
fed_provider,
table_name,
RemoteTableRef::from(table_reference),
schema,
)?))
)))
}

pub fn create_federated_table_provider(
Expand Down
7 changes: 2 additions & 5 deletions core/src/duckdb/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,14 @@ use arrow_schema::ArrowError;
use async_trait::async_trait;
use datafusion::catalog::Session;
use datafusion::common::{Constraints, SchemaExt};
use datafusion::datasource::sink::{DataSink, DataSinkExec};
use datafusion::logical_expr::dml::InsertOp;
use datafusion::{
datasource::{TableProvider, TableType},
error::DataFusionError,
execution::{SendableRecordBatchStream, TaskContext},
logical_expr::Expr,
physical_plan::{
insert::{DataSink, DataSinkExec},
metrics::MetricsSet,
DisplayAs, DisplayFormatType, ExecutionPlan,
},
physical_plan::{metrics::MetricsSet, DisplayAs, DisplayFormatType, ExecutionPlan},
};
use duckdb::Transaction;
use futures::StreamExt;
Expand Down
2 changes: 1 addition & 1 deletion core/src/flight/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ fn find_matching_column(
impl DisplayAs for FlightExec {
fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
match t {
DisplayFormatType::Default => write!(
DisplayFormatType::Default | DisplayFormatType::TreeRender => write!(
f,
"FlightExec: origin={}, streams={}",
self.config.origin,
Expand Down
10 changes: 6 additions & 4 deletions core/src/mysql/federation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
use datafusion::sql::sqlparser::ast::{self, VisitMut};
use datafusion::sql::unparser::dialect::Dialect;
use datafusion_federation::sql::{AstAnalyzer, SQLExecutor, SQLFederationProvider, SQLTableSource};
use datafusion_federation::sql::{
AstAnalyzer, RemoteTableRef, SQLExecutor, SQLFederationProvider, SQLTableSource,
};
use datafusion_federation::{FederatedTableProviderAdaptor, FederatedTableSource};
use futures::TryStreamExt;
use snafu::ResultExt;
Expand All @@ -24,14 +26,14 @@ impl MySQLTable {
fn create_federated_table_source(
self: Arc<Self>,
) -> DataFusionResult<Arc<dyn FederatedTableSource>> {
let table_name = self.base_table.table_reference.to_quoted_string();
let table_reference = self.base_table.table_reference.clone();
let schema = Arc::clone(&Arc::clone(&self).base_table.schema());
let fed_provider = Arc::new(SQLFederationProvider::new(self));
Ok(Arc::new(SQLTableSource::new_with_schema(
fed_provider,
table_name,
RemoteTableRef::from(table_reference),
schema,
)?))
)))
}

pub fn create_federated_table_provider(
Expand Down
40 changes: 25 additions & 15 deletions core/src/mysql/mysql_window.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use datafusion::sql::sqlparser::ast::{Expr, Function, Ident, VisitorMut, WindowType};
use datafusion::sql::sqlparser::ast::{
Expr, Function, Ident, ObjectNamePart, VisitorMut, WindowType,
};
use std::ops::ControlFlow;

#[derive(PartialEq, Eq)]
Expand Down Expand Up @@ -41,7 +43,7 @@ impl VisitorMut for MySQLWindowVisitor {

fn pre_visit_expr(&mut self, expr: &mut Expr) -> ControlFlow<Self::Break> {
if let Expr::Function(func) = expr {
if let Some(Ident { value, .. }) = func.name.0.first() {
if let Some(ObjectNamePart::Identifier(Ident { value, .. })) = func.name.0.first() {
// match for some scalars that support window functions
// all of them need to remove nulls first/last, but only rank removes the frame clause
if let Some(func_type) = FunctionType::from_str(value) {
Expand All @@ -58,7 +60,7 @@ impl MySQLWindowVisitor {
pub fn remove_nulls_first_last(func: &mut Function) {
if let Some(WindowType::WindowSpec(spec)) = func.over.as_mut() {
for order_by in &mut spec.order_by {
order_by.nulls_first = None; // nulls first/last are not supported in MySQL
order_by.options.nulls_first = None; // nulls first/last are not supported in MySQL
}
}
}
Expand All @@ -83,11 +85,11 @@ mod test {
#[test]
fn test_remove_frame_clause() {
let mut func = Function {
name: ObjectName(vec![Ident {
name: ObjectName(vec![ObjectNamePart::Identifier(Ident {
value: "RANK".to_string(),
quote_style: None,
span: Span::empty(),
}]),
})]),
args: ast::FunctionArguments::None,
over: Some(WindowType::WindowSpec(ast::WindowSpec {
window_name: None,
Expand All @@ -96,8 +98,10 @@ mod test {
expr: sqlparser::ast::Expr::Wildcard(AttachedToken(TokenWithSpan::wrap(
Token::Char('*'),
))),
asc: None,
nulls_first: Some(true),
options: sqlparser::ast::OrderByOptions {
asc: None,
nulls_first: Some(true),
},
with_fill: None,
}],
window_frame: Some(WindowFrame {
Expand All @@ -120,8 +124,10 @@ mod test {
expr: sqlparser::ast::Expr::Wildcard(AttachedToken(TokenWithSpan::wrap(
Token::Char('*'),
))),
asc: None,
nulls_first: Some(true),
options: sqlparser::ast::OrderByOptions {
asc: None,
nulls_first: Some(true),
},
with_fill: None,
}],
window_frame: None,
Expand All @@ -135,11 +141,11 @@ mod test {
#[test]
fn test_remove_nulls_first_last() {
let mut func = Function {
name: ObjectName(vec![Ident {
name: ObjectName(vec![ObjectNamePart::Identifier(Ident {
value: "RANK".to_string(),
quote_style: None,
span: Span::empty(),
}]),
})]),
args: sqlparser::ast::FunctionArguments::None,
over: Some(WindowType::WindowSpec(sqlparser::ast::WindowSpec {
window_name: None,
Expand All @@ -148,8 +154,10 @@ mod test {
expr: sqlparser::ast::Expr::Wildcard(AttachedToken(TokenWithSpan::wrap(
Token::Char('*'),
))),
asc: None,
nulls_first: Some(true),
options: sqlparser::ast::OrderByOptions {
asc: None,
nulls_first: Some(true),
},
with_fill: None,
}],
window_frame: Some(WindowFrame {
Expand All @@ -172,8 +180,10 @@ mod test {
expr: sqlparser::ast::Expr::Wildcard(AttachedToken(TokenWithSpan::wrap(
Token::Char('*'),
))),
asc: None,
nulls_first: None,
options: sqlparser::ast::OrderByOptions {
asc: None,
nulls_first: None,
},
with_fill: None,
}],
window_frame: Some(WindowFrame {
Expand Down
7 changes: 2 additions & 5 deletions core/src/mysql/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,13 @@ use crate::util::retriable_error::check_and_mark_retriable_error;
use crate::util::{constraints, to_datafusion_error};
use async_trait::async_trait;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::datasource::sink::{DataSink, DataSinkExec};
use datafusion::{
catalog::Session,
datasource::{TableProvider, TableType},
execution::{SendableRecordBatchStream, TaskContext},
logical_expr::{dml::InsertOp, Expr},
physical_plan::{
insert::{DataSink, DataSinkExec},
metrics::MetricsSet,
DisplayAs, DisplayFormatType, ExecutionPlan,
},
physical_plan::{metrics::MetricsSet, DisplayAs, DisplayFormatType, ExecutionPlan},
};
use futures::StreamExt;
use mysql_async::TxOpts;
Expand Down
11 changes: 5 additions & 6 deletions core/src/postgres/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,13 @@ use async_trait::async_trait;
use datafusion::{
catalog::Session,
common::{Constraints, SchemaExt},
datasource::{TableProvider, TableType},
datasource::{
sink::{DataSink, DataSinkExec},
TableProvider, TableType,
},
execution::{SendableRecordBatchStream, TaskContext},
logical_expr::{dml::InsertOp, Expr},
physical_plan::{
insert::{DataSink, DataSinkExec},
metrics::MetricsSet,
DisplayAs, DisplayFormatType, ExecutionPlan,
},
physical_plan::{metrics::MetricsSet, DisplayAs, DisplayFormatType, ExecutionPlan},
};
use futures::StreamExt;
use snafu::prelude::*;
Expand Down
3 changes: 1 addition & 2 deletions core/src/sql/arrow_sql_gen/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,8 +356,7 @@ pub fn rows_to_arrow(rows: &[Row], projected_schema: &Option<SchemaRef>) -> Resu
let Some(builder) = builder else {
return NoBuilderForIndexSnafu { index: i }.fail();
};
let Some(builder) = builder.as_any_mut().downcast_mut::<StringBuilder>()
else {
let Some(builder) = builder.as_any_mut().downcast_mut::<StringBuilder>() else {
return FailedToDowncastBuilderSnafu {
postgres_type: format!("{postgres_type}"),
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/sql/db_connection_pool/dbconnection/duckdbconn.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::any::Any;
use std::sync::Arc;
use std::collections::HashSet;
use std::sync::Arc;

use arrow::array::RecordBatch;
use arrow_schema::{DataType, Field};
Expand Down
10 changes: 6 additions & 4 deletions core/src/sql/sql_provider_datafusion/federation.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use crate::sql::db_connection_pool::{dbconnection::get_schema, JoinPushDown};
use async_trait::async_trait;
use datafusion_federation::sql::{SQLExecutor, SQLFederationProvider, SQLTableSource};
use datafusion_federation::sql::{
RemoteTableRef, SQLExecutor, SQLFederationProvider, SQLTableSource,
};
use datafusion_federation::{FederatedTableProviderAdaptor, FederatedTableSource};
use futures::TryStreamExt;
use snafu::prelude::*;
Expand All @@ -20,14 +22,14 @@ impl<T, P> SqlTable<T, P> {
fn create_federated_table_source(
self: Arc<Self>,
) -> DataFusionResult<Arc<dyn FederatedTableSource>> {
let table_name = self.table_reference.to_quoted_string();
let table_reference = self.table_reference.clone();
let schema = Arc::clone(&self.schema);
let fed_provider = Arc::new(SQLFederationProvider::new(self));
Ok(Arc::new(SQLTableSource::new_with_schema(
fed_provider,
table_name,
RemoteTableRef::from(table_reference),
schema,
)?))
)))
}

pub fn create_federated_table_provider(
Expand Down
10 changes: 6 additions & 4 deletions core/src/sqlite/federation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ use async_trait::async_trait;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::sql::sqlparser::ast::{self, VisitMut};
use datafusion::sql::unparser::dialect::Dialect;
use datafusion_federation::sql::{AstAnalyzer, SQLExecutor, SQLFederationProvider, SQLTableSource};
use datafusion_federation::sql::{
AstAnalyzer, RemoteTableRef, SQLExecutor, SQLFederationProvider, SQLTableSource,
};
use datafusion_federation::{FederatedTableProviderAdaptor, FederatedTableSource};
use futures::TryStreamExt;
use snafu::ResultExt;
Expand All @@ -24,14 +26,14 @@ impl<T, P> SQLiteTable<T, P> {
fn create_federated_table_source(
self: Arc<Self>,
) -> DataFusionResult<Arc<dyn FederatedTableSource>> {
let table_name = self.base_table.table_reference.to_quoted_string();
let table_reference = self.base_table.table_reference.clone();
let schema = Arc::clone(&Arc::clone(&self).base_table.schema());
let fed_provider = Arc::new(SQLFederationProvider::new(self));
Ok(Arc::new(SQLTableSource::new_with_schema(
fed_provider,
table_name,
RemoteTableRef::from(table_reference),
schema,
)?))
)))
}

pub fn create_federated_table_provider(
Expand Down
Loading
Loading