Skip to content

Commit bc49ebb

Browse files
committed
Separate CLI display logic
1 parent 90c3365 commit bc49ebb

File tree

2 files changed

+64
-49
lines changed

2 files changed

+64
-49
lines changed

src/app/mod.rs

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ use crate::{cli, ui};
2424
use color_eyre::eyre::eyre;
2525
use color_eyre::Result;
2626
use crossterm::event as ct;
27+
use datafusion::arrow::util::pretty::pretty_format_batches;
28+
use datafusion::execution::SendableRecordBatchStream;
2729
use datafusion::sql::parser::DFParser;
2830
use datafusion::sql::sqlparser::dialect::GenericDialect;
2931
use futures::FutureExt;
@@ -376,9 +378,8 @@ impl CliApp {
376378
let dialect = GenericDialect {};
377379
let statements = DFParser::parse_sql_with_dialect(sql, &dialect)?;
378380
for statement in statements {
379-
self.execution
380-
.execute_and_print_statement(statement)
381-
.await?;
381+
let stream = self.execution.execute_statement(statement).await?;
382+
self.print_stream(stream).await;
382383
}
383384
Ok(())
384385
}
@@ -406,7 +407,7 @@ impl CliApp {
406407
if line.ends_with(';') {
407408
// TODO: if the query errors, should we keep trying to execute
408409
// the other queries in the file? That is what datafusion-cli does...
409-
self.execution.execute_and_print_stream_sql(&query).await?;
410+
self.execute_and_print_sql(&query).await?;
410411
query.clear();
411412
} else {
412413
query.push('\n');
@@ -416,9 +417,29 @@ impl CliApp {
416417
// run the last line(s) in file if the last statement doesn't contain ‘;’
417418
// ignore if it only consists of '\n'
418419
if query.contains(|c| c != '\n') {
419-
self.execution.execute_and_print_stream_sql(&query).await?;
420+
self.execute_and_print_sql(&query).await?;
420421
}
421422

422423
Ok(())
423424
}
425+
426+
/// executes a sql statement and prints the result to stdout
427+
pub async fn execute_and_print_sql(&self, sql: &str) -> Result<()> {
428+
let stream = self.execution.execute_sql(sql).await?;
429+
self.print_stream(stream).await;
430+
Ok(())
431+
}
432+
433+
/// Prints the stream to stdout
434+
async fn print_stream(&self, mut stream: SendableRecordBatchStream) {
435+
while let Some(maybe_batch) = stream.next().await {
436+
match maybe_batch {
437+
Ok(batch) => match pretty_format_batches(&[batch]) {
438+
Ok(d) => println!("{}", d),
439+
Err(e) => println!("Error formatting batch: {e}"),
440+
},
441+
Err(e) => println!("Error executing SQL: {e}"),
442+
}
443+
}
444+
}
424445
}

src/execution/mod.rs

Lines changed: 38 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,12 @@ use std::time::Duration;
2323
use color_eyre::eyre::Result;
2424
use datafusion::execution::runtime_env::RuntimeEnv;
2525
use datafusion::execution::session_state::SessionStateBuilder;
26-
use datafusion::execution::TaskContext;
27-
use datafusion::physical_plan::{execute_stream, visit_execution_plan, ExecutionPlanVisitor};
26+
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
27+
use datafusion::physical_plan::{
28+
execute_stream, visit_execution_plan, ExecutionPlan, ExecutionPlanVisitor,
29+
};
2830
use datafusion::prelude::*;
2931
use datafusion::sql::parser::Statement;
30-
use datafusion::{arrow::util::pretty::pretty_format_batches, physical_plan::ExecutionPlan};
3132
#[cfg(feature = "deltalake")]
3233
use deltalake::delta_datafusion::DeltaTableFactory;
3334
use log::{error, info};
@@ -197,7 +198,7 @@ impl ExecutionContext {
197198
}
198199
}
199200
} else {
200-
match self.execute_sql(sql, false).await {
201+
match self.execute_sql_and_discard_results(sql).await {
201202
Ok(_) => {
202203
let elapsed = start.elapsed();
203204
query.set_execution_time(elapsed);
@@ -215,53 +216,46 @@ impl ExecutionContext {
215216
Ok(())
216217
}
217218

218-
/// Executes the specified parsed DataFusion statement and discards the result
219-
pub async fn execute_sql(&self, sql: &str, print: bool) -> Result<()> {
220-
let df = self.session_ctx.sql(sql).await?;
221-
self.execute_stream_dataframe(df, print).await
219+
/// Executes the specified sql string, driving it to completion but discarding the results
220+
pub async fn execute_sql_and_discard_results(
221+
&self,
222+
sql: &str,
223+
) -> datafusion::error::Result<()> {
224+
let mut stream = self.execute_sql(sql).await?;
225+
while let Some(maybe_batch) = stream.next().await {
226+
match maybe_batch {
227+
Ok(_) => {}
228+
Err(e) => {
229+
return Err(e);
230+
}
231+
}
232+
}
233+
Ok(())
222234
}
223235

224-
/// Executes the specified parsed DataFusion statement and prints the result to stdout
225-
pub async fn execute_and_print_statement(&self, statement: Statement) -> Result<()> {
236+
/// Executes the specified sql string, returning the resulting Stream
237+
pub async fn execute_sql(
238+
&self,
239+
sql: &str,
240+
) -> datafusion::error::Result<SendableRecordBatchStream> {
241+
self.session_ctx.sql(sql).await?.execute_stream().await
242+
}
243+
244+
/// Executes the specified parsed DataFusion statement, returning the resulting Stream
245+
pub async fn execute_statement(
246+
&self,
247+
statement: Statement,
248+
) -> datafusion::error::Result<SendableRecordBatchStream> {
226249
let plan = self
227250
.session_ctx
228251
.state()
229252
.statement_to_plan(statement)
230253
.await?;
231-
let df = self.session_ctx.execute_logical_plan(plan).await?;
232-
self.execute_stream_dataframe(df, true).await
233-
}
234-
235-
/// Executes the specified query and prints the result to stdout
236-
pub async fn execute_and_print_stream_sql(&self, query: &str) -> Result<()> {
237-
let df = self.session_ctx.sql(query).await?;
238-
self.execute_stream_dataframe(df, true).await
239-
}
240-
241-
pub async fn execute_stream_dataframe(&self, df: DataFrame, print: bool) -> Result<()> {
242-
let physical_plan = df.create_physical_plan().await?;
243-
let stream_cfg = SessionConfig::default();
244-
let stream_task_ctx = TaskContext::default().with_session_config(stream_cfg);
245-
let mut stream = execute_stream(physical_plan, stream_task_ctx.into()).unwrap();
246-
247-
while let Some(maybe_batch) = stream.next().await {
248-
if print {
249-
let batch = maybe_batch.unwrap();
250-
let d = pretty_format_batches(&[batch]).unwrap();
251-
println!("{}", d);
252-
} else {
253-
let _ = maybe_batch.unwrap();
254-
info!("Discarding batch");
255-
}
256-
}
257-
Ok(())
258-
}
259-
260-
pub async fn show_catalog(&self) -> Result<()> {
261-
let tables = self.session_ctx.sql("SHOW tables").await?.collect().await?;
262-
let formatted = pretty_format_batches(&tables).unwrap();
263-
println!("{}", formatted);
264-
Ok(())
254+
self.session_ctx
255+
.execute_logical_plan(plan)
256+
.await?
257+
.execute_stream()
258+
.await
265259
}
266260
}
267261

0 commit comments

Comments
 (0)