|
18 | 18 | //! [`ExecutionContext`]: DataFusion based execution context for running SQL queries
|
19 | 19 | //!
|
20 | 20 | use std::sync::Arc;
|
21 |
| -use std::time::Duration; |
22 | 21 |
|
23 | 22 | use color_eyre::eyre::Result;
|
24 | 23 | use datafusion::execution::runtime_env::RuntimeEnv;
|
25 | 24 | use datafusion::execution::session_state::SessionStateBuilder;
|
26 |
| -use datafusion::execution::TaskContext; |
27 |
| -use datafusion::physical_plan::{execute_stream, visit_execution_plan, ExecutionPlanVisitor}; |
| 25 | +use datafusion::execution::SendableRecordBatchStream; |
| 26 | +use datafusion::physical_plan::{visit_execution_plan, ExecutionPlan, ExecutionPlanVisitor}; |
28 | 27 | use datafusion::prelude::*;
|
29 | 28 | use datafusion::sql::parser::Statement;
|
30 |
| -use datafusion::{arrow::util::pretty::pretty_format_batches, physical_plan::ExecutionPlan}; |
31 | 29 | #[cfg(feature = "deltalake")]
|
32 | 30 | use deltalake::delta_datafusion::DeltaTableFactory;
|
33 |
| -use log::{error, info}; |
34 |
| -use tokio::sync::mpsc::UnboundedSender; |
| 31 | +use log::info; |
35 | 32 | use tokio_stream::StreamExt;
|
36 | 33 | #[cfg(feature = "s3")]
|
37 | 34 | use url::Url;
|
|
42 | 39 | };
|
43 | 40 |
|
44 | 41 | use crate::app::config::ExecutionConfig;
|
45 |
| -use crate::app::state::tabs::sql::Query; |
46 |
| -use crate::app::AppEvent; |
47 | 42 |
|
48 | 43 | /// Structure for executing queries either locally or remotely (via FlightSQL)
|
49 | 44 | ///
|
@@ -97,7 +92,7 @@ impl ExecutionContext {
|
97 | 92 | }
|
98 | 93 | }
|
99 | 94 | Err(e) => {
|
100 |
| - error!("Error creating object store: {:?}", e); |
| 95 | + log::error!("Error creating object store: {:?}", e); |
101 | 96 | }
|
102 | 97 | }
|
103 | 98 | }
|
@@ -145,123 +140,44 @@ impl ExecutionContext {
|
145 | 140 | &self.flightsql_client
|
146 | 141 | }
|
147 | 142 |
|
148 |
| - pub async fn run_sqls(&self, sqls: Vec<&str>, sender: UnboundedSender<AppEvent>) -> Result<()> { |
149 |
| - // We need to filter out empty strings to correctly determine the last query for displaying |
150 |
| - // results. |
151 |
| - info!("Running sqls: {:?}", sqls); |
152 |
| - let non_empty_sqls: Vec<&str> = sqls.into_iter().filter(|s| !s.is_empty()).collect(); |
153 |
| - info!("Non empty SQLs: {:?}", non_empty_sqls); |
154 |
| - let statement_count = non_empty_sqls.len(); |
155 |
| - for (i, sql) in non_empty_sqls.into_iter().enumerate() { |
156 |
| - info!("Running query {}", i); |
157 |
| - let _sender = sender.clone(); |
158 |
| - let mut query = |
159 |
| - Query::new(sql.to_string(), None, None, None, Duration::default(), None); |
160 |
| - let start = std::time::Instant::now(); |
161 |
| - if i == statement_count - 1 { |
162 |
| - info!("Executing last query and display results"); |
163 |
| - match self.session_ctx.sql(sql).await { |
164 |
| - Ok(df) => { |
165 |
| - if let Ok(physical_plan) = df.create_physical_plan().await { |
166 |
| - let stream_cfg = SessionConfig::default(); |
167 |
| - let stream_task_ctx = |
168 |
| - TaskContext::default().with_session_config(stream_cfg); |
169 |
| - let mut stream = |
170 |
| - execute_stream(physical_plan, stream_task_ctx.into()).unwrap(); |
171 |
| - let mut batches = Vec::new(); |
172 |
| - while let Some(maybe_batch) = stream.next().await { |
173 |
| - match maybe_batch { |
174 |
| - Ok(batch) => { |
175 |
| - batches.push(batch); |
176 |
| - } |
177 |
| - Err(e) => { |
178 |
| - let elapsed = start.elapsed(); |
179 |
| - query.set_error(Some(e.to_string())); |
180 |
| - query.set_execution_time(elapsed); |
181 |
| - break; |
182 |
| - } |
183 |
| - } |
184 |
| - } |
185 |
| - let elapsed = start.elapsed(); |
186 |
| - let rows: usize = batches.iter().map(|r| r.num_rows()).sum(); |
187 |
| - query.set_results(Some(batches)); |
188 |
| - query.set_num_rows(Some(rows)); |
189 |
| - query.set_execution_time(elapsed); |
190 |
| - } |
191 |
| - } |
192 |
| - Err(e) => { |
193 |
| - error!("Error creating dataframe: {:?}", e); |
194 |
| - let elapsed = start.elapsed(); |
195 |
| - query.set_error(Some(e.to_string())); |
196 |
| - query.set_execution_time(elapsed); |
197 |
| - } |
198 |
| - } |
199 |
| - } else { |
200 |
| - match self.execute_sql(sql, false).await { |
201 |
| - Ok(_) => { |
202 |
| - let elapsed = start.elapsed(); |
203 |
| - query.set_execution_time(elapsed); |
204 |
| - } |
205 |
| - Err(e) => { |
206 |
| - // We only log failed queries, we don't want to stop the execution of the |
207 |
| - // remaining queries. Perhaps there should be a configuration option for |
208 |
| - // this though in case the user wants to stop execution on the first error. |
209 |
| - error!("Error executing {sql}: {:?}", e); |
210 |
| - } |
211 |
| - } |
212 |
| - } |
213 |
| - _sender.send(AppEvent::QueryResult(query)).unwrap(); |
| 143 | + /// Executes the specified sql string, driving it to completion but discarding any results |
| 144 | + pub async fn execute_sql_and_discard_results( |
| 145 | + &self, |
| 146 | + sql: &str, |
| 147 | + ) -> datafusion::error::Result<()> { |
| 148 | + let mut stream = self.execute_sql(sql).await?; |
| 149 | + // note we don't call collect() to avoid buffering data |
| 150 | + while let Some(maybe_batch) = stream.next().await { |
| 151 | + maybe_batch?; // check for errors |
214 | 152 | }
|
215 | 153 | Ok(())
|
216 | 154 | }
|
217 | 155 |
|
218 |
| - /// Executes the specified parsed DataFusion statement and discards the result |
219 |
| - pub async fn execute_sql(&self, sql: &str, print: bool) -> Result<()> { |
220 |
| - let df = self.session_ctx.sql(sql).await?; |
221 |
| - self.execute_stream_dataframe(df, print).await |
| 156 | + /// Executes the specified sql string, returning the resulting |
| 157 | + /// [`SendableRecordBatchStream`] of results |
| 158 | + pub async fn execute_sql( |
| 159 | + &self, |
| 160 | + sql: &str, |
| 161 | + ) -> datafusion::error::Result<SendableRecordBatchStream> { |
| 162 | + self.session_ctx.sql(sql).await?.execute_stream().await |
222 | 163 | }
|
223 | 164 |
|
224 |
| - /// Executes the specified parsed DataFusion statement and prints the result to stdout |
225 |
| - pub async fn execute_and_print_statement(&self, statement: Statement) -> Result<()> { |
| 165 | + /// Executes the a pre-parsed DataFusion [`Statement`], returning the |
| 166 | + /// resulting [`SendableRecordBatchStream`] of results |
| 167 | + pub async fn execute_statement( |
| 168 | + &self, |
| 169 | + statement: Statement, |
| 170 | + ) -> datafusion::error::Result<SendableRecordBatchStream> { |
226 | 171 | let plan = self
|
227 | 172 | .session_ctx
|
228 | 173 | .state()
|
229 | 174 | .statement_to_plan(statement)
|
230 | 175 | .await?;
|
231 |
| - let df = self.session_ctx.execute_logical_plan(plan).await?; |
232 |
| - self.execute_stream_dataframe(df, true).await |
233 |
| - } |
234 |
| - |
235 |
| - /// Executes the specified query and prints the result to stdout |
236 |
| - pub async fn execute_and_print_stream_sql(&self, query: &str) -> Result<()> { |
237 |
| - let df = self.session_ctx.sql(query).await?; |
238 |
| - self.execute_stream_dataframe(df, true).await |
239 |
| - } |
240 |
| - |
241 |
| - pub async fn execute_stream_dataframe(&self, df: DataFrame, print: bool) -> Result<()> { |
242 |
| - let physical_plan = df.create_physical_plan().await?; |
243 |
| - let stream_cfg = SessionConfig::default(); |
244 |
| - let stream_task_ctx = TaskContext::default().with_session_config(stream_cfg); |
245 |
| - let mut stream = execute_stream(physical_plan, stream_task_ctx.into()).unwrap(); |
246 |
| - |
247 |
| - while let Some(maybe_batch) = stream.next().await { |
248 |
| - if print { |
249 |
| - let batch = maybe_batch.unwrap(); |
250 |
| - let d = pretty_format_batches(&[batch]).unwrap(); |
251 |
| - println!("{}", d); |
252 |
| - } else { |
253 |
| - let _ = maybe_batch.unwrap(); |
254 |
| - info!("Discarding batch"); |
255 |
| - } |
256 |
| - } |
257 |
| - Ok(()) |
258 |
| - } |
259 |
| - |
260 |
| - pub async fn show_catalog(&self) -> Result<()> { |
261 |
| - let tables = self.session_ctx.sql("SHOW tables").await?.collect().await?; |
262 |
| - let formatted = pretty_format_batches(&tables).unwrap(); |
263 |
| - println!("{}", formatted); |
264 |
| - Ok(()) |
| 176 | + self.session_ctx |
| 177 | + .execute_logical_plan(plan) |
| 178 | + .await? |
| 179 | + .execute_stream() |
| 180 | + .await |
265 | 181 | }
|
266 | 182 | }
|
267 | 183 |
|
|
0 commit comments