Skip to content

Commit 44857d1

Browse files
committed
Separate CLI display logic
1 parent 8185dfe commit 44857d1

File tree

4 files changed

+171
-122
lines changed

4 files changed

+171
-122
lines changed

src/app/app_execution.rs

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! [`AppExecution`]: Handles executing queries for the TUI application.
19+
20+
use crate::app::state::tabs::sql::Query;
21+
use crate::app::AppEvent;
22+
use crate::execution::ExecutionContext;
23+
use color_eyre::eyre::Result;
24+
use futures::StreamExt;
25+
use log::{error, info};
26+
use std::sync::Arc;
27+
use std::time::Duration;
28+
use tokio::sync::mpsc::UnboundedSender;
29+
30+
/// Handles executing queries for the TUI application, formatting results
31+
/// and sending them to the UI.
32+
pub(crate) struct AppExecution {
33+
inner: Arc<ExecutionContext>,
34+
}
35+
36+
impl AppExecution {
37+
/// Create a new instance of [`AppExecution`].
38+
pub fn new(inner: Arc<ExecutionContext>) -> Self {
39+
Self { inner }
40+
}
41+
42+
/// Run the sequence of SQL queries, sending the results as [`AppEvent::QueryResult`] via the sender.
43+
///
44+
/// All queries except the last one will have their results discarded.
45+
///
46+
/// Error handling: If an error occurs while executing a query, the error is
47+
/// logged and execution continues
48+
pub async fn run_sqls(&self, sqls: Vec<&str>, sender: UnboundedSender<AppEvent>) -> Result<()> {
49+
// We need to filter out empty strings to correctly determine the last query for displaying
50+
// results.
51+
info!("Running sqls: {:?}", sqls);
52+
let non_empty_sqls: Vec<&str> = sqls.into_iter().filter(|s| !s.is_empty()).collect();
53+
info!("Non empty SQLs: {:?}", non_empty_sqls);
54+
let statement_count = non_empty_sqls.len();
55+
for (i, sql) in non_empty_sqls.into_iter().enumerate() {
56+
info!("Running query {}", i);
57+
let _sender = sender.clone();
58+
let mut query =
59+
Query::new(sql.to_string(), None, None, None, Duration::default(), None);
60+
let start = std::time::Instant::now();
61+
if i == statement_count - 1 {
62+
info!("Executing last query and display results");
63+
match self.inner.execute_sql(sql).await {
64+
Ok(mut stream) => {
65+
let mut batches = Vec::new();
66+
while let Some(maybe_batch) = stream.next().await {
67+
match maybe_batch {
68+
Ok(batch) => {
69+
batches.push(batch);
70+
}
71+
Err(e) => {
72+
let elapsed = start.elapsed();
73+
query.set_error(Some(e.to_string()));
74+
query.set_execution_time(elapsed);
75+
break;
76+
}
77+
}
78+
}
79+
let elapsed = start.elapsed();
80+
let rows: usize = batches.iter().map(|r| r.num_rows()).sum();
81+
query.set_results(Some(batches));
82+
query.set_num_rows(Some(rows));
83+
query.set_execution_time(elapsed);
84+
}
85+
Err(e) => {
86+
error!("Error creating dataframe: {:?}", e);
87+
let elapsed = start.elapsed();
88+
query.set_error(Some(e.to_string()));
89+
query.set_execution_time(elapsed);
90+
}
91+
}
92+
} else {
93+
match self.inner.execute_sql_and_discard_results(sql).await {
94+
Ok(_) => {
95+
let elapsed = start.elapsed();
96+
query.set_execution_time(elapsed);
97+
}
98+
Err(e) => {
99+
// We only log failed queries, we don't want to stop the execution of the
100+
// remaining queries. Perhaps there should be a configuration option for
101+
// this though in case the user wants to stop execution on the first error.
102+
error!("Error executing {sql}: {:?}", e);
103+
}
104+
}
105+
}
106+
_sender.send(AppEvent::QueryResult(query))?; // Send the query result to the UI
107+
}
108+
Ok(())
109+
}
110+
}

src/app/handlers/sql.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use ratatui::crossterm::event::{KeyCode, KeyEvent, KeyModifiers};
2323
use tokio_stream::StreamExt;
2424

2525
use super::App;
26+
use crate::app::app_execution::AppExecution;
2627
use crate::app::{handlers::tab_navigation_handler, state::tabs::sql::Query, AppEvent};
2728
use crate::execution::collect_plan_stats;
2829

@@ -64,14 +65,14 @@ pub fn normal_mode_handler(app: &mut App, key: KeyEvent) {
6465
info!("Run query");
6566
let sql = app.state.sql_tab.editor().lines().join("");
6667
info!("SQL: {}", sql);
67-
let execution = Arc::clone(&app.execution);
68+
let app_execution = AppExecution::new(Arc::clone(&app.execution));
6869
let _event_tx = app.app_event_tx.clone();
6970
// TODO: Maybe this should be on a separate runtime to prevent blocking main thread /
7071
// runtime
7172
// TODO: Extract this into function to be used in both normal and editable handler
7273
tokio::spawn(async move {
7374
let sqls: Vec<&str> = sql.split(';').collect();
74-
let _ = execution.run_sqls(sqls, _event_tx).await;
75+
let _ = app_execution.run_sqls(sqls, _event_tx).await;
7576
});
7677
}
7778
_ => {}

src/app/mod.rs

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
pub mod app_execution;
1819
pub mod config;
1920
pub mod handlers;
2021
pub mod state;
@@ -24,6 +25,8 @@ use crate::{cli, ui};
2425
use color_eyre::eyre::eyre;
2526
use color_eyre::Result;
2627
use crossterm::event as ct;
28+
use datafusion::arrow::util::pretty::pretty_format_batches;
29+
use datafusion::execution::SendableRecordBatchStream;
2730
use datafusion::sql::parser::DFParser;
2831
use datafusion::sql::sqlparser::dialect::GenericDialect;
2932
use futures::FutureExt;
@@ -376,9 +379,8 @@ impl CliApp {
376379
let dialect = GenericDialect {};
377380
let statements = DFParser::parse_sql_with_dialect(sql, &dialect)?;
378381
for statement in statements {
379-
self.execution
380-
.execute_and_print_statement(statement)
381-
.await?;
382+
let stream = self.execution.execute_statement(statement).await?;
383+
self.print_stream(stream).await;
382384
}
383385
Ok(())
384386
}
@@ -406,7 +408,7 @@ impl CliApp {
406408
if line.ends_with(';') {
407409
// TODO: if the query errors, should we keep trying to execute
408410
// the other queries in the file? That is what datafusion-cli does...
409-
self.execution.execute_and_print_stream_sql(&query).await?;
411+
self.execute_and_print_sql(&query).await?;
410412
query.clear();
411413
} else {
412414
query.push('\n');
@@ -416,9 +418,29 @@ impl CliApp {
416418
// run the last line(s) in file if the last statement doesn't contain ‘;’
417419
// ignore if it only consists of '\n'
418420
if query.contains(|c| c != '\n') {
419-
self.execution.execute_and_print_stream_sql(&query).await?;
421+
self.execute_and_print_sql(&query).await?;
420422
}
421423

422424
Ok(())
423425
}
426+
427+
/// executes a sql statement and prints the result to stdout
428+
pub async fn execute_and_print_sql(&self, sql: &str) -> Result<()> {
429+
let stream = self.execution.execute_sql(sql).await?;
430+
self.print_stream(stream).await;
431+
Ok(())
432+
}
433+
434+
/// Prints the stream to stdout
435+
async fn print_stream(&self, mut stream: SendableRecordBatchStream) {
436+
while let Some(maybe_batch) = stream.next().await {
437+
match maybe_batch {
438+
Ok(batch) => match pretty_format_batches(&[batch]) {
439+
Ok(d) => println!("{}", d),
440+
Err(e) => println!("Error formatting batch: {e}"),
441+
},
442+
Err(e) => println!("Error executing SQL: {e}"),
443+
}
444+
}
445+
}
424446
}

src/execution/mod.rs

Lines changed: 31 additions & 115 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,17 @@
1818
//! [`ExecutionContext`]: DataFusion based execution context for running SQL queries
1919
//!
2020
use std::sync::Arc;
21-
use std::time::Duration;
2221

2322
use color_eyre::eyre::Result;
2423
use datafusion::execution::runtime_env::RuntimeEnv;
2524
use datafusion::execution::session_state::SessionStateBuilder;
26-
use datafusion::execution::TaskContext;
27-
use datafusion::physical_plan::{execute_stream, visit_execution_plan, ExecutionPlanVisitor};
25+
use datafusion::execution::SendableRecordBatchStream;
26+
use datafusion::physical_plan::{visit_execution_plan, ExecutionPlan, ExecutionPlanVisitor};
2827
use datafusion::prelude::*;
2928
use datafusion::sql::parser::Statement;
30-
use datafusion::{arrow::util::pretty::pretty_format_batches, physical_plan::ExecutionPlan};
3129
#[cfg(feature = "deltalake")]
3230
use deltalake::delta_datafusion::DeltaTableFactory;
33-
use log::{error, info};
34-
use tokio::sync::mpsc::UnboundedSender;
31+
use log::info;
3532
use tokio_stream::StreamExt;
3633
#[cfg(feature = "s3")]
3734
use url::Url;
@@ -42,8 +39,6 @@ use {
4239
};
4340

4441
use crate::app::config::ExecutionConfig;
45-
use crate::app::state::tabs::sql::Query;
46-
use crate::app::AppEvent;
4742

4843
/// Structure for executing queries either locally or remotely (via FlightSQL)
4944
///
@@ -97,7 +92,7 @@ impl ExecutionContext {
9792
}
9893
}
9994
Err(e) => {
100-
error!("Error creating object store: {:?}", e);
95+
log::error!("Error creating object store: {:?}", e);
10196
}
10297
}
10398
}
@@ -145,123 +140,44 @@ impl ExecutionContext {
145140
&self.flightsql_client
146141
}
147142

148-
pub async fn run_sqls(&self, sqls: Vec<&str>, sender: UnboundedSender<AppEvent>) -> Result<()> {
149-
// We need to filter out empty strings to correctly determine the last query for displaying
150-
// results.
151-
info!("Running sqls: {:?}", sqls);
152-
let non_empty_sqls: Vec<&str> = sqls.into_iter().filter(|s| !s.is_empty()).collect();
153-
info!("Non empty SQLs: {:?}", non_empty_sqls);
154-
let statement_count = non_empty_sqls.len();
155-
for (i, sql) in non_empty_sqls.into_iter().enumerate() {
156-
info!("Running query {}", i);
157-
let _sender = sender.clone();
158-
let mut query =
159-
Query::new(sql.to_string(), None, None, None, Duration::default(), None);
160-
let start = std::time::Instant::now();
161-
if i == statement_count - 1 {
162-
info!("Executing last query and display results");
163-
match self.session_ctx.sql(sql).await {
164-
Ok(df) => {
165-
if let Ok(physical_plan) = df.create_physical_plan().await {
166-
let stream_cfg = SessionConfig::default();
167-
let stream_task_ctx =
168-
TaskContext::default().with_session_config(stream_cfg);
169-
let mut stream =
170-
execute_stream(physical_plan, stream_task_ctx.into()).unwrap();
171-
let mut batches = Vec::new();
172-
while let Some(maybe_batch) = stream.next().await {
173-
match maybe_batch {
174-
Ok(batch) => {
175-
batches.push(batch);
176-
}
177-
Err(e) => {
178-
let elapsed = start.elapsed();
179-
query.set_error(Some(e.to_string()));
180-
query.set_execution_time(elapsed);
181-
break;
182-
}
183-
}
184-
}
185-
let elapsed = start.elapsed();
186-
let rows: usize = batches.iter().map(|r| r.num_rows()).sum();
187-
query.set_results(Some(batches));
188-
query.set_num_rows(Some(rows));
189-
query.set_execution_time(elapsed);
190-
}
191-
}
192-
Err(e) => {
193-
error!("Error creating dataframe: {:?}", e);
194-
let elapsed = start.elapsed();
195-
query.set_error(Some(e.to_string()));
196-
query.set_execution_time(elapsed);
197-
}
198-
}
199-
} else {
200-
match self.execute_sql(sql, false).await {
201-
Ok(_) => {
202-
let elapsed = start.elapsed();
203-
query.set_execution_time(elapsed);
204-
}
205-
Err(e) => {
206-
// We only log failed queries, we don't want to stop the execution of the
207-
// remaining queries. Perhaps there should be a configuration option for
208-
// this though in case the user wants to stop execution on the first error.
209-
error!("Error executing {sql}: {:?}", e);
210-
}
211-
}
212-
}
213-
_sender.send(AppEvent::QueryResult(query)).unwrap();
143+
/// Executes the specified sql string, driving it to completion but discarding any results
144+
pub async fn execute_sql_and_discard_results(
145+
&self,
146+
sql: &str,
147+
) -> datafusion::error::Result<()> {
148+
let mut stream = self.execute_sql(sql).await?;
149+
// note we don't call collect() to avoid buffering data
150+
while let Some(maybe_batch) = stream.next().await {
151+
maybe_batch?; // check for errors
214152
}
215153
Ok(())
216154
}
217155

218-
/// Executes the specified parsed DataFusion statement and discards the result
219-
pub async fn execute_sql(&self, sql: &str, print: bool) -> Result<()> {
220-
let df = self.session_ctx.sql(sql).await?;
221-
self.execute_stream_dataframe(df, print).await
156+
/// Executes the specified sql string, returning the resulting
157+
/// [`SendableRecordBatchStream`] of results
158+
pub async fn execute_sql(
159+
&self,
160+
sql: &str,
161+
) -> datafusion::error::Result<SendableRecordBatchStream> {
162+
self.session_ctx.sql(sql).await?.execute_stream().await
222163
}
223164

224-
/// Executes the specified parsed DataFusion statement and prints the result to stdout
225-
pub async fn execute_and_print_statement(&self, statement: Statement) -> Result<()> {
165+
/// Executes the a pre-parsed DataFusion [`Statement`], returning the
166+
/// resulting [`SendableRecordBatchStream`] of results
167+
pub async fn execute_statement(
168+
&self,
169+
statement: Statement,
170+
) -> datafusion::error::Result<SendableRecordBatchStream> {
226171
let plan = self
227172
.session_ctx
228173
.state()
229174
.statement_to_plan(statement)
230175
.await?;
231-
let df = self.session_ctx.execute_logical_plan(plan).await?;
232-
self.execute_stream_dataframe(df, true).await
233-
}
234-
235-
/// Executes the specified query and prints the result to stdout
236-
pub async fn execute_and_print_stream_sql(&self, query: &str) -> Result<()> {
237-
let df = self.session_ctx.sql(query).await?;
238-
self.execute_stream_dataframe(df, true).await
239-
}
240-
241-
pub async fn execute_stream_dataframe(&self, df: DataFrame, print: bool) -> Result<()> {
242-
let physical_plan = df.create_physical_plan().await?;
243-
let stream_cfg = SessionConfig::default();
244-
let stream_task_ctx = TaskContext::default().with_session_config(stream_cfg);
245-
let mut stream = execute_stream(physical_plan, stream_task_ctx.into()).unwrap();
246-
247-
while let Some(maybe_batch) = stream.next().await {
248-
if print {
249-
let batch = maybe_batch.unwrap();
250-
let d = pretty_format_batches(&[batch]).unwrap();
251-
println!("{}", d);
252-
} else {
253-
let _ = maybe_batch.unwrap();
254-
info!("Discarding batch");
255-
}
256-
}
257-
Ok(())
258-
}
259-
260-
pub async fn show_catalog(&self) -> Result<()> {
261-
let tables = self.session_ctx.sql("SHOW tables").await?.collect().await?;
262-
let formatted = pretty_format_batches(&tables).unwrap();
263-
println!("{}", formatted);
264-
Ok(())
176+
self.session_ctx
177+
.execute_logical_plan(plan)
178+
.await?
179+
.execute_stream()
180+
.await
265181
}
266182
}
267183

0 commit comments

Comments
 (0)