diff --git a/src/app/handlers/flightsql.rs b/src/app/handlers/flightsql.rs index 65af60ae..944efdae 100644 --- a/src/app/handlers/flightsql.rs +++ b/src/app/handlers/flightsql.rs @@ -73,7 +73,7 @@ pub fn normal_mode_handler(app: &mut App, key: KeyEvent) { let execution = Arc::clone(&app.execution); let _event_tx = app.app_event_tx.clone(); tokio::spawn(async move { - let client = &execution.flightsql_client; + let client = execution.flightsql_client(); let mut query = FlightSQLQuery::new(sql.clone(), None, None, None, Duration::default(), None); let start = Instant::now(); diff --git a/src/app/handlers/mod.rs b/src/app/handlers/mod.rs index 19b7f269..f3ff2186 100644 --- a/src/app/handlers/mod.rs +++ b/src/app/handlers/mod.rs @@ -161,7 +161,7 @@ pub fn app_event_handler(app: &mut App, event: AppEvent) -> Result<()> { AppEvent::ExecuteDDL(ddl) => { let queries: Vec = ddl.split(';').map(|s| s.to_string()).collect(); queries.into_iter().for_each(|q| { - let ctx = app.execution.session_ctx.clone(); + let ctx = app.execution.session_ctx().clone(); tokio::spawn(async move { match ctx.sql(&q).await { Ok(df) => { @@ -208,7 +208,7 @@ pub fn app_event_handler(app: &mut App, event: AppEvent) -> Result<()> { let url: &'static str = Box::leak(url.into_boxed_str()); let execution = Arc::clone(&app.execution); tokio::spawn(async move { - let client = &execution.flightsql_client; + let client = execution.flightsql_client(); let maybe_channel = Channel::from_static(url).connect().await; info!("Created channel"); match maybe_channel { diff --git a/src/app/handlers/sql.rs b/src/app/handlers/sql.rs index 067a0c90..086a02af 100644 --- a/src/app/handlers/sql.rs +++ b/src/app/handlers/sql.rs @@ -22,12 +22,9 @@ use log::{error, info}; use ratatui::crossterm::event::{KeyCode, KeyEvent, KeyModifiers}; use tokio_stream::StreamExt; -use crate::app::{ - execution::collect_plan_stats, handlers::tab_navigation_handler, state::tabs::sql::Query, - AppEvent, -}; - use super::App; +use crate::app::{handlers::tab_navigation_handler, state::tabs::sql::Query, AppEvent}; +use crate::execution::collect_plan_stats; pub fn normal_mode_handler(app: &mut App, key: KeyEvent) { match key.code { @@ -89,7 +86,7 @@ pub fn editable_handler(app: &mut App, key: KeyEvent) { (KeyCode::Esc, _) => app.state.sql_tab.exit_edit(), (KeyCode::Enter, KeyModifiers::CONTROL) => { let query = app.state.sql_tab.editor().lines().join(""); - let ctx = app.execution.session_ctx.clone(); + let ctx = app.execution.session_ctx().clone(); let _event_tx = app.app_event_tx.clone(); // TODO: Maybe this should be on a separate runtime to prevent blocking main thread / // runtime diff --git a/src/app/mod.rs b/src/app/mod.rs index 5170a135..3e5bcb5f 100644 --- a/src/app/mod.rs +++ b/src/app/mod.rs @@ -16,7 +16,6 @@ // under the License. pub mod config; -pub mod execution; pub mod handlers; pub mod state; @@ -45,9 +44,9 @@ use tokio::task::JoinHandle; use tokio_stream::StreamExt; use tokio_util::sync::CancellationToken; -use self::execution::ExecutionContext; use self::handlers::{app_event_handler, crossterm_event_handler}; use self::state::tabs::sql::Query; +use crate::execution::ExecutionContext; #[cfg(feature = "flightsql")] use self::state::tabs::flightsql::FlightSQLQuery; @@ -328,84 +327,98 @@ pub async fn run_app(cli: cli::DftCli, state: state::AppState<'_>) -> Result<()> app.exit() } -pub async fn execute_files_or_commands( - files: Vec, - commands: Vec, - state: &state::AppState<'_>, -) -> Result<()> { - match (files.is_empty(), commands.is_empty()) { - (true, true) => Err(eyre!("No files or commands provided to execute")), - (false, true) => execute_files(files, state).await, - (true, false) => execute_commands(commands, state).await, - (false, false) => Err(eyre!( - "Cannot execute both files and commands at the same time" - )), - } +/// Encapsulates the command line interface +pub struct CliApp { + /// Execution context for running queries + execution: ExecutionContext, } -async fn execute_files(files: Vec, state: &state::AppState<'_>) -> Result<()> { - info!("Executing files: {:?}", files); - let execution = ExecutionContext::new(state.config.execution.clone()); - for file in files { - exec_from_file(&execution, &file).await? +impl CliApp { + pub fn new(state: state::AppState<'static>) -> Self { + let execution = ExecutionContext::new(state.config.execution.clone()); + + Self { execution } } - Ok(()) -} -async fn execute_commands(commands: Vec, state: &state::AppState<'_>) -> Result<()> { - info!("Executing commands: {:?}", commands); - for command in commands { - exec_from_string(&command, state).await? + pub async fn execute_files_or_commands( + &self, + files: Vec, + commands: Vec, + ) -> Result<()> { + match (files.is_empty(), commands.is_empty()) { + (true, true) => Err(eyre!("No files or commands provided to execute")), + (false, true) => self.execute_files(files).await, + (true, false) => self.execute_commands(commands).await, + (false, false) => Err(eyre!( + "Cannot execute both files and commands at the same time" + )), + } } - Ok(()) -} + async fn execute_files(&self, files: Vec) -> Result<()> { + info!("Executing files: {:?}", files); + for file in files { + self.exec_from_file(&file).await? + } -async fn exec_from_string(sql: &str, state: &state::AppState<'_>) -> Result<()> { - let dialect = GenericDialect {}; - let execution = ExecutionContext::new(state.config.execution.clone()); - let statements = DFParser::parse_sql_with_dialect(sql, &dialect)?; - for statement in statements { - execution.execute_and_print_statement(statement).await?; + Ok(()) } - Ok(()) -} - -/// run and execute SQL statements and commands from a file, against a context -/// with the given print options -pub async fn exec_from_file(ctx: &ExecutionContext, file: &Path) -> Result<()> { - let file = File::open(file)?; - let reader = BufReader::new(file); + async fn execute_commands(&self, commands: Vec) -> Result<()> { + info!("Executing commands: {:?}", commands); + for command in commands { + self.exec_from_string(&command).await? + } - let mut query = String::new(); + Ok(()) + } - for line in reader.lines() { - let line = line?; - if line.starts_with("#!") { - continue; + async fn exec_from_string(&self, sql: &str) -> Result<()> { + let dialect = GenericDialect {}; + let statements = DFParser::parse_sql_with_dialect(sql, &dialect)?; + for statement in statements { + self.execution + .execute_and_print_statement(statement) + .await?; } - if line.starts_with("--") { - continue; + Ok(()) + } + + /// run and execute SQL statements and commands from a file, against a context + /// with the given print options + pub async fn exec_from_file(&self, file: &Path) -> Result<()> { + let file = File::open(file)?; + let reader = BufReader::new(file); + + let mut query = String::new(); + + for line in reader.lines() { + let line = line?; + if line.starts_with("#!") { + continue; + } + if line.starts_with("--") { + continue; + } + + let line = line.trim_end(); + query.push_str(line); + // if we found the end of a query, run it + if line.ends_with(';') { + // TODO: if the query errors, should we keep trying to execute + // the other queries in the file? That is what datafusion-cli does... + self.execution.execute_and_print_stream_sql(&query).await?; + query.clear(); + } else { + query.push('\n'); + } } - let line = line.trim_end(); - query.push_str(line); - // if we found the end of a query, run it - if line.ends_with(';') { - // TODO: if the query errors, should we keep trying to execute - // the other queries in the file? That is what datafusion-cli does... - ctx.execute_and_print_stream_sql(&query).await?; - query.clear(); - } else { - query.push('\n'); + // run the last line(s) in file if the last statement doesn't contain ‘;’ + // ignore if it only consists of '\n' + if query.contains(|c| c != '\n') { + self.execution.execute_and_print_stream_sql(&query).await?; } - } - // run the last line(s) in file if the last statement doesn't contain ‘;’ - // ignore if it only consists of '\n' - if query.contains(|c| c != '\n') { - ctx.execute_and_print_stream_sql(&query).await?; + Ok(()) } - - Ok(()) } diff --git a/src/app/state/tabs/flightsql.rs b/src/app/state/tabs/flightsql.rs index ac08aaf2..a49d92c2 100644 --- a/src/app/state/tabs/flightsql.rs +++ b/src/app/state/tabs/flightsql.rs @@ -25,7 +25,7 @@ use ratatui::style::Style; use ratatui::widgets::TableState; use tui_textarea::TextArea; -use crate::app::execution::ExecutionStats; +use crate::execution::ExecutionStats; #[derive(Clone, Debug)] pub struct FlightSQLQuery { diff --git a/src/app/state/tabs/history.rs b/src/app/state/tabs/history.rs index 9629dfd2..76461206 100644 --- a/src/app/state/tabs/history.rs +++ b/src/app/state/tabs/history.rs @@ -20,7 +20,7 @@ use std::time::Duration; use ratatui::widgets::TableState; -use crate::app::execution::ExecutionStats; +use crate::execution::ExecutionStats; #[derive(Debug)] pub enum Context { diff --git a/src/app/state/tabs/sql.rs b/src/app/state/tabs/sql.rs index 7c11223a..7bc633db 100644 --- a/src/app/state/tabs/sql.rs +++ b/src/app/state/tabs/sql.rs @@ -25,7 +25,7 @@ use ratatui::style::Style; use ratatui::widgets::TableState; use tui_textarea::TextArea; -use crate::app::execution::ExecutionStats; +use crate::execution::ExecutionStats; #[derive(Clone, Debug)] pub struct Query { diff --git a/src/app/execution.rs b/src/execution/mod.rs similarity index 88% rename from src/app/execution.rs rename to src/execution/mod.rs index 02c8cd6c..0e4be5ec 100644 --- a/src/app/execution.rs +++ b/src/execution/mod.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +//! [`ExecutionContext`]: DataFusion based execution context for running SQL queries +//! use std::sync::Arc; use std::time::Duration; @@ -31,7 +33,6 @@ use deltalake::delta_datafusion::DeltaTableFactory; use log::{error, info}; use tokio::sync::mpsc::UnboundedSender; use tokio_stream::StreamExt; -use tokio_util::sync::CancellationToken; #[cfg(feature = "s3")] use url::Url; #[cfg(feature = "flightsql")] @@ -40,21 +41,34 @@ use { tonic::transport::Channel, }; -use super::config::ExecutionConfig; -use super::state::tabs::sql::Query; -use super::AppEvent; +use crate::app::config::ExecutionConfig; +use crate::app::state::tabs::sql::Query; +use crate::app::AppEvent; +/// Structure for executing queries either locally or remotely (via FlightSQL) +/// +/// This context includes both: +/// +/// 1. The configuration of a [`SessionContext`] with various extensions enabled +/// +/// 2. The code for running SQL queries +/// +/// The design goals for this module are to serve as an example of how to integrate +/// DataFusion into an application and to provide a simple interface for running SQL queries +/// with the various extensions enabled. +/// +/// Thus it is important (eventually) not depend on the code in the app crate pub struct ExecutionContext { - pub session_ctx: SessionContext, - pub config: ExecutionConfig, - pub cancellation_token: CancellationToken, + session_ctx: SessionContext, #[cfg(feature = "flightsql")] - pub flightsql_client: Mutex>>, + flightsql_client: Mutex>>, } impl ExecutionContext { - #[allow(unused_mut)] + /// Construct a new `ExecutionContext` with the specified configuration pub fn new(config: ExecutionConfig) -> Self { + let _ = &config; // avoid unused variable warning (it is used when some features are enabled) + let cfg = SessionConfig::default() .with_batch_size(1) .with_information_schema(true); @@ -91,6 +105,7 @@ impl ExecutionContext { } } + #[allow(unused_mut)] // used when deltalake is enabled let mut state = SessionStateBuilder::new() .with_default_features() .with_runtime_env(runtime_env.into()) @@ -106,12 +121,9 @@ impl ExecutionContext { { let session_ctx = SessionContext::new_with_state(state); - let cancellation_token = CancellationToken::new(); Self { - config, session_ctx, - cancellation_token, #[cfg(feature = "flightsql")] flightsql_client: Mutex::new(None), } @@ -122,10 +134,17 @@ impl ExecutionContext { Ok(()) } + /// Return the inner DataFusion [`SessionContext`] pub fn session_ctx(&self) -> &SessionContext { &self.session_ctx } + /// Return a handle to the underlying FlightSQL client, if any + #[cfg(feature = "flightsql")] + pub fn flightsql_client(&self) -> &Mutex>> { + &self.flightsql_client + } + pub async fn run_sqls(&self, sqls: Vec<&str>, sender: UnboundedSender) -> Result<()> { // We need to filter out empty strings to correctly determine the last query for displaying // results. @@ -196,7 +215,7 @@ impl ExecutionContext { Ok(()) } - /// Execcutes the specified parsed DataFusion statement and discards the result + /// Executes the specified parsed DataFusion statement and discards the result pub async fn execute_sql(&self, sql: &str, print: bool) -> Result<()> { let df = self.session_ctx.sql(sql).await?; self.execute_stream_dataframe(df, print).await diff --git a/src/lib.rs b/src/lib.rs index d6fc70e1..ed191eae 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,5 @@ pub mod app; pub mod cli; +pub mod execution; pub mod telemetry; pub mod ui; diff --git a/src/main.rs b/src/main.rs index d3813b13..a8fde044 100644 --- a/src/main.rs +++ b/src/main.rs @@ -17,7 +17,7 @@ use clap::Parser; use color_eyre::Result; -use dft::app::{execute_files_or_commands, run_app, state}; +use dft::app::{run_app, state, CliApp}; use dft::cli; use dft::telemetry; @@ -25,13 +25,17 @@ use dft::telemetry; async fn main() -> Result<()> { let cli = cli::DftCli::parse(); - // If executing commands from files, do so and then exit + // CLI mode: executing commands from files or CLI arguments if !cli.files.is_empty() || !cli.commands.is_empty() { // use env_logger to setup logging for CLI env_logger::init(); let state = state::initialize(cli.clone()); - execute_files_or_commands(cli.files.clone(), cli.commands.clone(), &state).await?; - } else { + let app = CliApp::new(state); + app.execute_files_or_commands(cli.files.clone(), cli.commands.clone()) + .await?; + } + // UI mode: running the TUI + else { // use alternate logging for TUI telemetry::initialize_logs()?; let state = state::initialize(cli.clone());