Skip to content

Extract ExecutionContext to its own module and cli code to CliApp #131

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/app/handlers/flightsql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ pub fn normal_mode_handler(app: &mut App, key: KeyEvent) {
let execution = Arc::clone(&app.execution);
let _event_tx = app.app_event_tx.clone();
tokio::spawn(async move {
let client = &execution.flightsql_client;
let client = execution.flightsql_client();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just made the fields non pub and made accessors so the API surface area was clearer

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup makes sense, i had started to use that approach for more recent work but i know theres some remaining pub stuff

let mut query =
FlightSQLQuery::new(sql.clone(), None, None, None, Duration::default(), None);
let start = Instant::now();
Expand Down
4 changes: 2 additions & 2 deletions src/app/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ pub fn app_event_handler(app: &mut App, event: AppEvent) -> Result<()> {
AppEvent::ExecuteDDL(ddl) => {
let queries: Vec<String> = ddl.split(';').map(|s| s.to_string()).collect();
queries.into_iter().for_each(|q| {
let ctx = app.execution.session_ctx.clone();
let ctx = app.execution.session_ctx().clone();
tokio::spawn(async move {
match ctx.sql(&q).await {
Ok(df) => {
Expand Down Expand Up @@ -208,7 +208,7 @@ pub fn app_event_handler(app: &mut App, event: AppEvent) -> Result<()> {
let url: &'static str = Box::leak(url.into_boxed_str());
let execution = Arc::clone(&app.execution);
tokio::spawn(async move {
let client = &execution.flightsql_client;
let client = execution.flightsql_client();
let maybe_channel = Channel::from_static(url).connect().await;
info!("Created channel");
match maybe_channel {
Expand Down
9 changes: 3 additions & 6 deletions src/app/handlers/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,9 @@ use log::{error, info};
use ratatui::crossterm::event::{KeyCode, KeyEvent, KeyModifiers};
use tokio_stream::StreamExt;

use crate::app::{
execution::collect_plan_stats, handlers::tab_navigation_handler, state::tabs::sql::Query,
AppEvent,
};

use super::App;
use crate::app::{handlers::tab_navigation_handler, state::tabs::sql::Query, AppEvent};
use crate::execution::collect_plan_stats;

pub fn normal_mode_handler(app: &mut App, key: KeyEvent) {
match key.code {
Expand Down Expand Up @@ -89,7 +86,7 @@ pub fn editable_handler(app: &mut App, key: KeyEvent) {
(KeyCode::Esc, _) => app.state.sql_tab.exit_edit(),
(KeyCode::Enter, KeyModifiers::CONTROL) => {
let query = app.state.sql_tab.editor().lines().join("");
let ctx = app.execution.session_ctx.clone();
let ctx = app.execution.session_ctx().clone();
let _event_tx = app.app_event_tx.clone();
// TODO: Maybe this should be on a separate runtime to prevent blocking main thread /
// runtime
Expand Down
145 changes: 79 additions & 66 deletions src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
// under the License.

pub mod config;
pub mod execution;
pub mod handlers;
pub mod state;

Expand Down Expand Up @@ -45,9 +44,9 @@ use tokio::task::JoinHandle;
use tokio_stream::StreamExt;
use tokio_util::sync::CancellationToken;

use self::execution::ExecutionContext;
use self::handlers::{app_event_handler, crossterm_event_handler};
use self::state::tabs::sql::Query;
use crate::execution::ExecutionContext;

#[cfg(feature = "flightsql")]
use self::state::tabs::flightsql::FlightSQLQuery;
Expand Down Expand Up @@ -328,84 +327,98 @@ pub async fn run_app(cli: cli::DftCli, state: state::AppState<'_>) -> Result<()>
app.exit()
}

pub async fn execute_files_or_commands(
files: Vec<PathBuf>,
commands: Vec<String>,
state: &state::AppState<'_>,
) -> Result<()> {
match (files.is_empty(), commands.is_empty()) {
(true, true) => Err(eyre!("No files or commands provided to execute")),
(false, true) => execute_files(files, state).await,
(true, false) => execute_commands(commands, state).await,
(false, false) => Err(eyre!(
"Cannot execute both files and commands at the same time"
)),
}
/// Encapsulates the command line interface
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I refactored a bunch of free functions into a struct (which I plan to move to its own module soon)

pub struct CliApp {
/// Execution context for running queries
execution: ExecutionContext,
}
async fn execute_files(files: Vec<PathBuf>, state: &state::AppState<'_>) -> Result<()> {
info!("Executing files: {:?}", files);
let execution = ExecutionContext::new(state.config.execution.clone());

for file in files {
exec_from_file(&execution, &file).await?
impl CliApp {
pub fn new(state: state::AppState<'static>) -> Self {
let execution = ExecutionContext::new(state.config.execution.clone());

Self { execution }
}

Ok(())
}
async fn execute_commands(commands: Vec<String>, state: &state::AppState<'_>) -> Result<()> {
info!("Executing commands: {:?}", commands);
for command in commands {
exec_from_string(&command, state).await?
pub async fn execute_files_or_commands(
&self,
files: Vec<PathBuf>,
commands: Vec<String>,
) -> Result<()> {
match (files.is_empty(), commands.is_empty()) {
(true, true) => Err(eyre!("No files or commands provided to execute")),
(false, true) => self.execute_files(files).await,
(true, false) => self.execute_commands(commands).await,
(false, false) => Err(eyre!(
"Cannot execute both files and commands at the same time"
)),
}
}

Ok(())
}
async fn execute_files(&self, files: Vec<PathBuf>) -> Result<()> {
info!("Executing files: {:?}", files);
for file in files {
self.exec_from_file(&file).await?
}

async fn exec_from_string(sql: &str, state: &state::AppState<'_>) -> Result<()> {
let dialect = GenericDialect {};
let execution = ExecutionContext::new(state.config.execution.clone());
let statements = DFParser::parse_sql_with_dialect(sql, &dialect)?;
for statement in statements {
execution.execute_and_print_statement(statement).await?;
Ok(())
}
Ok(())
}

/// run and execute SQL statements and commands from a file, against a context
/// with the given print options
pub async fn exec_from_file(ctx: &ExecutionContext, file: &Path) -> Result<()> {
let file = File::open(file)?;
let reader = BufReader::new(file);
async fn execute_commands(&self, commands: Vec<String>) -> Result<()> {
info!("Executing commands: {:?}", commands);
for command in commands {
self.exec_from_string(&command).await?
}

let mut query = String::new();
Ok(())
}

for line in reader.lines() {
let line = line?;
if line.starts_with("#!") {
continue;
async fn exec_from_string(&self, sql: &str) -> Result<()> {
let dialect = GenericDialect {};
let statements = DFParser::parse_sql_with_dialect(sql, &dialect)?;
for statement in statements {
self.execution
.execute_and_print_statement(statement)
.await?;
}
if line.starts_with("--") {
continue;
Ok(())
}

/// run and execute SQL statements and commands from a file, against a context
/// with the given print options
pub async fn exec_from_file(&self, file: &Path) -> Result<()> {
let file = File::open(file)?;
let reader = BufReader::new(file);

let mut query = String::new();

for line in reader.lines() {
let line = line?;
if line.starts_with("#!") {
continue;
}
if line.starts_with("--") {
continue;
}

let line = line.trim_end();
query.push_str(line);
// if we found the end of a query, run it
if line.ends_with(';') {
// TODO: if the query errors, should we keep trying to execute
// the other queries in the file? That is what datafusion-cli does...
self.execution.execute_and_print_stream_sql(&query).await?;
query.clear();
} else {
query.push('\n');
}
}

let line = line.trim_end();
query.push_str(line);
// if we found the end of a query, run it
if line.ends_with(';') {
// TODO: if the query errors, should we keep trying to execute
// the other queries in the file? That is what datafusion-cli does...
ctx.execute_and_print_stream_sql(&query).await?;
query.clear();
} else {
query.push('\n');
// run the last line(s) in file if the last statement doesn't contain ‘;’
// ignore if it only consists of '\n'
if query.contains(|c| c != '\n') {
self.execution.execute_and_print_stream_sql(&query).await?;
}
}

// run the last line(s) in file if the last statement doesn't contain ‘;’
// ignore if it only consists of '\n'
if query.contains(|c| c != '\n') {
ctx.execute_and_print_stream_sql(&query).await?;
Ok(())
}

Ok(())
}
2 changes: 1 addition & 1 deletion src/app/state/tabs/flightsql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use ratatui::style::Style;
use ratatui::widgets::TableState;
use tui_textarea::TextArea;

use crate::app::execution::ExecutionStats;
use crate::execution::ExecutionStats;

#[derive(Clone, Debug)]
pub struct FlightSQLQuery {
Expand Down
2 changes: 1 addition & 1 deletion src/app/state/tabs/history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::time::Duration;

use ratatui::widgets::TableState;

use crate::app::execution::ExecutionStats;
use crate::execution::ExecutionStats;

#[derive(Debug)]
pub enum Context {
Expand Down
2 changes: 1 addition & 1 deletion src/app/state/tabs/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use ratatui::style::Style;
use ratatui::widgets::TableState;
use tui_textarea::TextArea;

use crate::app::execution::ExecutionStats;
use crate::execution::ExecutionStats;

#[derive(Clone, Debug)]
pub struct Query {
Expand Down
45 changes: 32 additions & 13 deletions src/app/execution.rs → src/execution/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.

//! [`ExecutionContext`]: DataFusion based execution context for running SQL queries
//!
use std::sync::Arc;
use std::time::Duration;

Expand All @@ -31,7 +33,6 @@ use deltalake::delta_datafusion::DeltaTableFactory;
use log::{error, info};
use tokio::sync::mpsc::UnboundedSender;
use tokio_stream::StreamExt;
use tokio_util::sync::CancellationToken;
#[cfg(feature = "s3")]
use url::Url;
#[cfg(feature = "flightsql")]
Expand All @@ -40,21 +41,34 @@ use {
tonic::transport::Channel,
};

use super::config::ExecutionConfig;
use super::state::tabs::sql::Query;
use super::AppEvent;
use crate::app::config::ExecutionConfig;
use crate::app::state::tabs::sql::Query;
use crate::app::AppEvent;

/// Structure for executing queries either locally or remotely (via FlightSQL)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the core reason for this PR -- to move this structure to its own module

It still depends on app::AppEvent as well as printing to stdout. I plan to move the display of results code into different structures in the next PR

///
/// This context includes both:
///
/// 1. The configuration of a [`SessionContext`] with various extensions enabled
///
/// 2. The code for running SQL queries
///
/// The design goals for this module are to serve as an example of how to integrate
/// DataFusion into an application and to provide a simple interface for running SQL queries
/// with the various extensions enabled.
///
/// Thus it is important (eventually) not depend on the code in the app crate
pub struct ExecutionContext {
pub session_ctx: SessionContext,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when I made these non pub it turned out much of it isn't used, so I removed the unused fields for now

pub config: ExecutionConfig,
pub cancellation_token: CancellationToken,
session_ctx: SessionContext,
#[cfg(feature = "flightsql")]
pub flightsql_client: Mutex<Option<FlightSqlServiceClient<Channel>>>,
flightsql_client: Mutex<Option<FlightSqlServiceClient<Channel>>>,
}

impl ExecutionContext {
#[allow(unused_mut)]
/// Construct a new `ExecutionContext` with the specified configuration
pub fn new(config: ExecutionConfig) -> Self {
let _ = &config; // avoid unused variable warning (it is used when some features are enabled)

let cfg = SessionConfig::default()
.with_batch_size(1)
.with_information_schema(true);
Expand Down Expand Up @@ -91,6 +105,7 @@ impl ExecutionContext {
}
}

#[allow(unused_mut)] // used when deltalake is enabled
let mut state = SessionStateBuilder::new()
.with_default_features()
.with_runtime_env(runtime_env.into())
Expand All @@ -106,12 +121,9 @@ impl ExecutionContext {

{
let session_ctx = SessionContext::new_with_state(state);
let cancellation_token = CancellationToken::new();

Self {
config,
session_ctx,
cancellation_token,
#[cfg(feature = "flightsql")]
flightsql_client: Mutex::new(None),
}
Expand All @@ -122,10 +134,17 @@ impl ExecutionContext {
Ok(())
}

/// Return the inner DataFusion [`SessionContext`]
pub fn session_ctx(&self) -> &SessionContext {
&self.session_ctx
}

/// Return a handle to the underlying FlightSQL client, if any
#[cfg(feature = "flightsql")]
pub fn flightsql_client(&self) -> &Mutex<Option<FlightSqlServiceClient<Channel>>> {
&self.flightsql_client
}

pub async fn run_sqls(&self, sqls: Vec<&str>, sender: UnboundedSender<AppEvent>) -> Result<()> {
// We need to filter out empty strings to correctly determine the last query for displaying
// results.
Expand Down Expand Up @@ -196,7 +215,7 @@ impl ExecutionContext {
Ok(())
}

/// Execcutes the specified parsed DataFusion statement and discards the result
/// Executes the specified parsed DataFusion statement and discards the result
pub async fn execute_sql(&self, sql: &str, print: bool) -> Result<()> {
let df = self.session_ctx.sql(sql).await?;
self.execute_stream_dataframe(df, print).await
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod app;
pub mod cli;
pub mod execution;
pub mod telemetry;
pub mod ui;
Loading
Loading