From 3f900ac5e188ecc3b5098ab5a1c499631cb51247 Mon Sep 17 00:00:00 2001 From: "Dawei H." Date: Mon, 10 Feb 2025 23:29:44 +0800 Subject: [PATCH] Test all examples from library-user-guide & user-guide docs (#14544) * add mut annotation * fix rust examples * fix rust examples * update * fix first doctest * fix first doctest * fix more doctest * fix more doctest * fix more doctest * adopt rustdoc syntax * adopt rustdoc syntax * adopt rustdoc syntax * fix more doctest * add missing imports * final udtf * reenable * remove dep * run prettier * api-health * update doc * update doc * temp fix * fix doc * fix async schema provider * fix async schema provider * fix doc * fix doc * reorder * refactor * s * finish * minor update * add missing docs * add deps (#3) * fix doctest * update doc * fix doctest * fix doctest * tweak showkeys * fix doctest * fix doctest * fix doctest * fix doctest * update to use user_doc * add rustdoc preprocessing * fix dir * revert to original doc * add allocator * mark type * update * fix doctest * add doctest * add doctest * fix doctest * fix doctest * fix doctest * fix doctest * fix doctest * fix doctest * fix doctest * fix doctest * fix doctest * prettier format * revert change to datafusion-testing * add apache header * install cmake in setup-builder for ci workflow dependency * taplo + fix snmalloc * Update function docs * preprocess user-guide * Render examples as sql * fix intro * fix docs via script --------- Co-authored-by: Andrew Lamb --- .github/actions/setup-builder/action.yaml | 2 +- Cargo.lock | 3 + datafusion/core/Cargo.toml | 3 + .../core/src/bin/print_functions_docs.rs | 2 +- datafusion/core/src/lib.rs | 207 ++++- dev/update_function_docs.sh | 4 +- docs/build.sh | 3 + docs/rustdoc_trim.py | 75 ++ docs/source/library-user-guide/adding-udfs.md | 733 +++++++++++++++--- docs/source/library-user-guide/api-health.md | 4 +- docs/source/library-user-guide/catalogs.md | 147 +++- .../custom-table-providers.md | 407 +++++++++- .../library-user-guide/query-optimizer.md | 244 +++--- .../library-user-guide/working-with-exprs.md | 219 +++++- docs/source/user-guide/cli/usage.md | 4 +- docs/source/user-guide/crate-configuration.md | 4 +- docs/source/user-guide/explain-usage.md | 10 +- .../user-guide/sql/aggregate_functions.md | 74 +- docs/source/user-guide/sql/ddl.md | 6 +- docs/source/user-guide/sql/dml.md | 4 +- docs/source/user-guide/sql/explain.md | 6 +- .../source/user-guide/sql/scalar_functions.md | 308 ++++---- .../source/user-guide/sql/window_functions.md | 26 +- 23 files changed, 1983 insertions(+), 512 deletions(-) create mode 100644 docs/rustdoc_trim.py diff --git a/.github/actions/setup-builder/action.yaml b/.github/actions/setup-builder/action.yaml index 22d2f2187dd0..5c7ecd3e1edb 100644 --- a/.github/actions/setup-builder/action.yaml +++ b/.github/actions/setup-builder/action.yaml @@ -30,7 +30,7 @@ runs: run: | RETRY=("ci/scripts/retry" timeout 120) "${RETRY[@]}" apt-get update - "${RETRY[@]}" apt-get install -y protobuf-compiler + "${RETRY[@]}" apt-get install -y protobuf-compiler cmake - name: Setup Rust toolchain shell: bash # rustfmt is needed for the substrait build script diff --git a/Cargo.lock b/Cargo.lock index 953f4622be97..cd6586180d73 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1730,10 +1730,12 @@ dependencies = [ "chrono", "criterion", "ctor", + "dashmap", "datafusion-catalog", "datafusion-catalog-listing", "datafusion-common", "datafusion-common-runtime", + "datafusion-doc", "datafusion-execution", "datafusion-expr", "datafusion-functions", @@ -1742,6 +1744,7 @@ dependencies = [ "datafusion-functions-table", "datafusion-functions-window", "datafusion-functions-window-common", + "datafusion-macros", "datafusion-optimizer", "datafusion-physical-expr", "datafusion-physical-expr-common", diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index c3c764ab1435..3c7287b887a2 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -95,6 +95,7 @@ datafusion-catalog = { workspace = true } datafusion-catalog-listing = { workspace = true } datafusion-common = { workspace = true, features = ["object_store"] } datafusion-common-runtime = { workspace = true } +datafusion-doc = { workspace = true } datafusion-execution = { workspace = true } datafusion-expr = { workspace = true } datafusion-functions = { workspace = true } @@ -102,6 +103,7 @@ datafusion-functions-aggregate = { workspace = true } datafusion-functions-nested = { workspace = true, optional = true } datafusion-functions-table = { workspace = true } datafusion-functions-window = { workspace = true } +datafusion-macros = { workspace = true } datafusion-optimizer = { workspace = true } datafusion-physical-expr = { workspace = true } datafusion-physical-expr-common = { workspace = true } @@ -130,6 +132,7 @@ zstd = { version = "0.13", optional = true, default-features = false } async-trait = { workspace = true } criterion = { version = "0.5", features = ["async_tokio"] } ctor = { workspace = true } +dashmap = "6.1.0" datafusion-functions-window-common = { workspace = true } datafusion-physical-optimizer = { workspace = true } doc-comment = { workspace = true } diff --git a/datafusion/core/src/bin/print_functions_docs.rs b/datafusion/core/src/bin/print_functions_docs.rs index 8b453d5e9698..7afb90282a80 100644 --- a/datafusion/core/src/bin/print_functions_docs.rs +++ b/datafusion/core/src/bin/print_functions_docs.rs @@ -193,7 +193,7 @@ fn print_docs( {} -``` +```sql {} ``` "#, diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs index b256ed38039a..48ee8e46bc0f 100644 --- a/datafusion/core/src/lib.rs +++ b/datafusion/core/src/lib.rs @@ -844,11 +844,17 @@ doc_comment::doctest!("../../../README.md", readme_example_test); // // For example, if `user_guide_expressions(line 123)` fails, // go to `docs/source/user-guide/expressions.md` to find the relevant problem. +// +#[cfg(doctest)] +doc_comment::doctest!( + "../../../docs/source/user-guide/concepts-readings-events.md", + user_guide_concepts_readings_events +); #[cfg(doctest)] doc_comment::doctest!( - "../../../docs/source/user-guide/example-usage.md", - user_guide_example_usage + "../../../docs/source/user-guide/configs.md", + user_guide_configs ); #[cfg(doctest)] @@ -859,14 +865,20 @@ doc_comment::doctest!( #[cfg(doctest)] doc_comment::doctest!( - "../../../docs/source/user-guide/configs.md", - user_guide_configs + "../../../docs/source/user-guide/dataframe.md", + user_guide_dataframe ); #[cfg(doctest)] doc_comment::doctest!( - "../../../docs/source/user-guide/dataframe.md", - user_guide_dataframe + "../../../docs/source/user-guide/example-usage.md", + user_guide_example_usage +); + +#[cfg(doctest)] +doc_comment::doctest!( + "../../../docs/source/user-guide/explain-usage.md", + user_guide_explain_usage ); #[cfg(doctest)] @@ -875,16 +887,181 @@ doc_comment::doctest!( user_guide_expressions ); +#[cfg(doctest)] +doc_comment::doctest!("../../../docs/source/user-guide/faq.md", user_guide_faq); + #[cfg(doctest)] doc_comment::doctest!( - "../../../docs/source/library-user-guide/using-the-sql-api.md", - library_user_guide_sql_api + "../../../docs/source/user-guide/introduction.md", + user_guide_introduction +); + +#[cfg(doctest)] +doc_comment::doctest!( + "../../../docs/source/user-guide/cli/datasources.md", + user_guide_cli_datasource +); + +#[cfg(doctest)] +doc_comment::doctest!( + "../../../docs/source/user-guide/cli/installation.md", + user_guide_cli_installation +); + +#[cfg(doctest)] +doc_comment::doctest!( + "../../../docs/source/user-guide/cli/overview.md", + user_guide_cli_overview +); + +#[cfg(doctest)] +doc_comment::doctest!( + "../../../docs/source/user-guide/cli/usage.md", + user_guide_cli_usage +); + +#[cfg(doctest)] +doc_comment::doctest!( + "../../../docs/source/user-guide/sql/aggregate_functions.md", + user_guide_sql_aggregate_functions +); + +#[cfg(doctest)] +doc_comment::doctest!( + "../../../docs/source/user-guide/sql/data_types.md", + user_guide_sql_data_types +); + +#[cfg(doctest)] +doc_comment::doctest!( + "../../../docs/source/user-guide/sql/ddl.md", + user_guide_sql_ddl +); + +#[cfg(doctest)] +doc_comment::doctest!( + "../../../docs/source/user-guide/sql/dml.md", + user_guide_sql_dml +); + +#[cfg(doctest)] +doc_comment::doctest!( + "../../../docs/source/user-guide/sql/explain.md", + user_guide_sql_exmplain +); + +#[cfg(doctest)] +doc_comment::doctest!( + "../../../docs/source/user-guide/sql/information_schema.md", + user_guide_sql_information_schema +); + +#[cfg(doctest)] +doc_comment::doctest!( + "../../../docs/source/user-guide/sql/operators.md", + user_guide_sql_operators +); + +#[cfg(doctest)] +doc_comment::doctest!( + "../../../docs/source/user-guide/sql/scalar_functions.md", + user_guide_sql_scalar_functions +); + +#[cfg(doctest)] +doc_comment::doctest!( + "../../../docs/source/user-guide/sql/select.md", + user_guide_sql_select +); + +#[cfg(doctest)] +doc_comment::doctest!( + "../../../docs/source/user-guide/sql/special_functions.md", + user_guide_sql_special_functions +); + +#[cfg(doctest)] +doc_comment::doctest!( + "../../../docs/source/user-guide/sql/sql_status.md", + user_guide_sql_status +); + +#[cfg(doctest)] +doc_comment::doctest!( + "../../../docs/source/user-guide/sql/subqueries.md", + user_guide_sql_subqueries +); + +#[cfg(doctest)] +doc_comment::doctest!( + "../../../docs/source/user-guide/sql/window_functions.md", + user_guide_sql_window_functions +); + +#[cfg(doctest)] +doc_comment::doctest!( + "../../../docs/source/user-guide/sql/write_options.md", + user_guide_sql_write_options +); + +#[cfg(doctest)] +doc_comment::doctest!( + "../../../docs/source/library-user-guide/adding-udfs.md", + library_user_guide_adding_udfs +); + +#[cfg(doctest)] +doc_comment::doctest!( + "../../../docs/source/library-user-guide/api-health.md", + library_user_guide_api_health ); #[cfg(doctest)] doc_comment::doctest!( "../../../docs/source/library-user-guide/building-logical-plans.md", - library_user_guide_logical_plans + library_user_guide_building_logical_plans +); + +#[cfg(doctest)] +doc_comment::doctest!( + "../../../docs/source/library-user-guide/catalogs.md", + library_user_guide_catalogs +); + +#[cfg(doctest)] +doc_comment::doctest!( + "../../../docs/source/library-user-guide/custom-table-providers.md", + library_user_guide_custom_table_providers +); + +#[cfg(doctest)] +doc_comment::doctest!( + "../../../docs/source/library-user-guide/extending-operators.md", + library_user_guide_extending_operators +); + +#[cfg(doctest)] +doc_comment::doctest!( + "../../../docs/source/library-user-guide/extensions.md", + library_user_guide_extensions +); + +#[cfg(doctest)] +doc_comment::doctest!( + "../../../docs/source/library-user-guide/index.md", + library_user_guide_index +); + +#[cfg(doctest)] +doc_comment::doctest!( + "../../../docs/source/library-user-guide/profiling.md", + library_user_guide_profiling +); + +#[cfg(doctest)] +doc_comment::doctest!( + "../../../docs/source/library-user-guide/query-optimizer.md", + library_user_guide_query_optimizer ); #[cfg(doctest)] @@ -892,3 +1069,15 @@ doc_comment::doctest!( "../../../docs/source/library-user-guide/using-the-dataframe-api.md", library_user_guide_dataframe_api ); + +#[cfg(doctest)] +doc_comment::doctest!( + "../../../docs/source/library-user-guide/using-the-sql-api.md", + library_user_guide_sql_api +); + +#[cfg(doctest)] +doc_comment::doctest!( + "../../../docs/source/library-user-guide/working-with-exprs.md", + library_user_guide_working_with_exprs +); diff --git a/dev/update_function_docs.sh b/dev/update_function_docs.sh index 205ab41984a5..a9e87aacf5ad 100755 --- a/dev/update_function_docs.sh +++ b/dev/update_function_docs.sh @@ -236,7 +236,7 @@ WINDOW w AS (PARTITION BY depname ORDER BY salary DESC); The syntax for the OVER-clause is -``` +```sql function([expr]) OVER( [PARTITION BY expr[, …]] @@ -247,7 +247,7 @@ function([expr]) where **frame_clause** is one of: -``` +```sql { RANGE | ROWS | GROUPS } frame_start { RANGE | ROWS | GROUPS } BETWEEN frame_start AND frame_end ``` diff --git a/docs/build.sh b/docs/build.sh index 3fdcd0327024..14464fab40ea 100755 --- a/docs/build.sh +++ b/docs/build.sh @@ -25,4 +25,7 @@ mkdir temp cp -rf source/* temp/ # replace relative URLs with absolute URLs sed -i -e 's/\.\.\/\.\.\/\.\.\//https:\/\/github.com\/apache\/arrow-datafusion\/blob\/main\//g' temp/contributor-guide/index.md + +python rustdoc_trim.py + make SOURCEDIR=`pwd`/temp html diff --git a/docs/rustdoc_trim.py b/docs/rustdoc_trim.py new file mode 100644 index 000000000000..7ea96dbb44a5 --- /dev/null +++ b/docs/rustdoc_trim.py @@ -0,0 +1,75 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import re + +from pathlib import Path + +# Regex pattern to match Rust code blocks in Markdown +RUST_CODE_BLOCK_PATTERN = re.compile(r"```rust\s*(.*?)```", re.DOTALL) + + +def remove_hashtag_lines_in_rust_blocks(markdown_content): + """ + Removes lines starting with '# ' in Rust code blocks within a Markdown string. + """ + + def _process_code_block(match): + # Extract the code block content + code_block_content = match.group(1).strip() + + # Remove lines starting with '#' + modified_code_block = "\n".join( + line + for line in code_block_content.splitlines() + if (not line.lstrip().startswith("# ")) and line.strip() != "#" + ) + + # Return the modified code block wrapped in triple backticks + return f"```rust\n{modified_code_block}\n```" + + # Replace all Rust code blocks using the _process_code_block function + return RUST_CODE_BLOCK_PATTERN.sub(_process_code_block, markdown_content) + + +# Example usage +def process_markdown_file(file_path): + # Read the Markdown file + with open(file_path, "r", encoding="utf-8") as file: + markdown_content = file.read() + + # Remove lines starting with '#' in Rust code blocks + updated_markdown_content = remove_hashtag_lines_in_rust_blocks(markdown_content) + + # Write the updated content back to the Markdown file + with open(file_path, "w", encoding="utf-8") as file: + file.write(updated_markdown_content) + + print(f"Done processing file: {file_path}") + + +root_directory = Path("./temp/library-user-guide") +for file_path in root_directory.rglob("*.md"): + print(f"Processing file: {file_path}") + process_markdown_file(file_path) + +root_directory = Path("./temp/user-guide") +for file_path in root_directory.rglob("*.md"): + print(f"Processing file: {file_path}") + process_markdown_file(file_path) + +print("All Markdown files processed.") diff --git a/docs/source/library-user-guide/adding-udfs.md b/docs/source/library-user-guide/adding-udfs.md index a9202976973b..a365ef6696a3 100644 --- a/docs/source/library-user-guide/adding-udfs.md +++ b/docs/source/library-user-guide/adding-udfs.md @@ -55,46 +55,62 @@ of arguments. This a lower level API with more functionality but is more complex, also documented in [`advanced_udf.rs`]. ```rust +use std::sync::Arc; use std::any::Any; +use std::sync::LazyLock; use arrow::datatypes::DataType; +use datafusion_common::cast::as_int64_array; use datafusion_common::{DataFusionError, plan_err, Result}; -use datafusion_expr::{col, ColumnarValue, Signature, Volatility}; +use datafusion_expr::{col, ColumnarValue, ScalarFunctionArgs, Signature, Volatility}; +use datafusion::arrow::array::{ArrayRef, Int64Array}; use datafusion_expr::{ScalarUDFImpl, ScalarUDF}; - +use datafusion_macros::user_doc; +use datafusion_doc::Documentation; + +/// This struct for a simple UDF that adds one to an int32 +#[user_doc( + doc_section(label = "Math Functions"), + description = "Add one udf", + syntax_example = "add_one(1)" +)] #[derive(Debug)] struct AddOne { - signature: Signature -}; + signature: Signature, +} impl AddOne { - fn new() -> Self { - Self { - signature: Signature::uniform(1, vec![DataType::Int32], Volatility::Immutable) - } - } + fn new() -> Self { + Self { + signature: Signature::uniform(1, vec![DataType::Int32], Volatility::Immutable), + } + } } /// Implement the ScalarUDFImpl trait for AddOne impl ScalarUDFImpl for AddOne { - fn as_any(&self) -> &dyn Any { self } - fn name(&self) -> &str { "add_one" } - fn signature(&self) -> &Signature { &self.signature } - fn return_type(&self, args: &[DataType]) -> Result { - if !matches!(args.get(0), Some(&DataType::Int32)) { - return plan_err!("add_one only accepts Int32 arguments"); - } - Ok(DataType::Int32) - } - // The actual implementation would add one to the argument - fn invoke_batch(&self, args: &[ColumnarValue], _number_rows: usize) -> Result { - let args = columnar_values_to_array(args)?; + fn as_any(&self) -> &dyn Any { self } + fn name(&self) -> &str { "add_one" } + fn signature(&self) -> &Signature { &self.signature } + fn return_type(&self, args: &[DataType]) -> Result { + if !matches!(args.get(0), Some(&DataType::Int32)) { + return plan_err!("add_one only accepts Int32 arguments"); + } + Ok(DataType::Int32) + } + // The actual implementation would add one to the argument + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + let args = ColumnarValue::values_to_arrays(&args.args)?; let i64s = as_int64_array(&args[0])?; let new_array = i64s .iter() .map(|array_elem| array_elem.map(|value| value + 1)) .collect::(); - Ok(Arc::new(new_array)) + + Ok(ColumnarValue::from(Arc::new(new_array) as ArrayRef)) + } + fn documentation(&self) -> Option<&Documentation> { + self.doc() } } ``` @@ -102,15 +118,75 @@ impl ScalarUDFImpl for AddOne { We now need to register the function with DataFusion so that it can be used in the context of a query. ```rust +# use std::sync::Arc; +# use std::any::Any; +# use std::sync::LazyLock; +# use arrow::datatypes::DataType; +# use datafusion_common::cast::as_int64_array; +# use datafusion_common::{DataFusionError, plan_err, Result}; +# use datafusion_expr::{col, ColumnarValue, ScalarFunctionArgs, Signature, Volatility}; +# use datafusion::arrow::array::{ArrayRef, Int64Array}; +# use datafusion_expr::{ScalarUDFImpl, ScalarUDF}; +# use datafusion_macros::user_doc; +# use datafusion_doc::Documentation; +# +# /// This struct for a simple UDF that adds one to an int32 +# #[user_doc( +# doc_section(label = "Math Functions"), +# description = "Add one udf", +# syntax_example = "add_one(1)" +# )] +# #[derive(Debug)] +# struct AddOne { +# signature: Signature, +# } +# +# impl AddOne { +# fn new() -> Self { +# Self { +# signature: Signature::uniform(1, vec![DataType::Int32], Volatility::Immutable), +# } +# } +# } +# +# /// Implement the ScalarUDFImpl trait for AddOne +# impl ScalarUDFImpl for AddOne { +# fn as_any(&self) -> &dyn Any { self } +# fn name(&self) -> &str { "add_one" } +# fn signature(&self) -> &Signature { &self.signature } +# fn return_type(&self, args: &[DataType]) -> Result { +# if !matches!(args.get(0), Some(&DataType::Int32)) { +# return plan_err!("add_one only accepts Int32 arguments"); +# } +# Ok(DataType::Int32) +# } +# // The actual implementation would add one to the argument +# fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { +# let args = ColumnarValue::values_to_arrays(&args.args)?; +# let i64s = as_int64_array(&args[0])?; +# +# let new_array = i64s +# .iter() +# .map(|array_elem| array_elem.map(|value| value + 1)) +# .collect::(); +# +# Ok(ColumnarValue::from(Arc::new(new_array) as ArrayRef)) +# } +# fn documentation(&self) -> Option<&Documentation> { +# self.doc() +# } +# } +use datafusion::execution::context::SessionContext; + // Create a new ScalarUDF from the implementation let add_one = ScalarUDF::from(AddOne::new()); +// Call the function `add_one(col)` +let expr = add_one.call(vec![col("a")]); + // register the UDF with the context so it can be invoked by name and from SQL let mut ctx = SessionContext::new(); ctx.register_udf(add_one.clone()); - -// Call the function `add_one(col)` -let expr = add_one.call(vec![col("a")]); ``` ### Adding a Scalar UDF by [`create_udf`] @@ -121,7 +197,6 @@ There is a an older, more concise, but also more limited API [`create_udf`] avai ```rust use std::sync::Arc; - use datafusion::arrow::array::{ArrayRef, Int64Array}; use datafusion::common::cast::as_int64_array; use datafusion::common::Result; @@ -145,6 +220,24 @@ This "works" in isolation, i.e. if you have a slice of `ArrayRef`s, you can call `ArrayRef` with 1 added to each value. ```rust +# use std::sync::Arc; +# use datafusion::arrow::array::{ArrayRef, Int64Array}; +# use datafusion::common::cast::as_int64_array; +# use datafusion::common::Result; +# use datafusion::logical_expr::ColumnarValue; +# +# pub fn add_one(args: &[ColumnarValue]) -> Result { +# // Error handling omitted for brevity +# let args = ColumnarValue::values_to_arrays(args)?; +# let i64s = as_int64_array(&args[0])?; +# +# let new_array = i64s +# .iter() +# .map(|array_elem| array_elem.map(|value| value + 1)) +# .collect::(); +# +# Ok(ColumnarValue::from(Arc::new(new_array) as ArrayRef)) +# } let input = vec![Some(1), None, Some(3)]; let input = ColumnarValue::from(Arc::new(Int64Array::from(input)) as ArrayRef); @@ -165,9 +258,26 @@ with the `SessionContext`. DataFusion provides the [`create_udf`] and helper functions to make this easier. ```rust +# use std::sync::Arc; +# use datafusion::arrow::array::{ArrayRef, Int64Array}; +# use datafusion::common::cast::as_int64_array; +# use datafusion::common::Result; +# use datafusion::logical_expr::ColumnarValue; +# +# pub fn add_one(args: &[ColumnarValue]) -> Result { +# // Error handling omitted for brevity +# let args = ColumnarValue::values_to_arrays(args)?; +# let i64s = as_int64_array(&args[0])?; +# +# let new_array = i64s +# .iter() +# .map(|array_elem| array_elem.map(|value| value + 1)) +# .collect::(); +# +# Ok(ColumnarValue::from(Arc::new(new_array) as ArrayRef)) +# } use datafusion::logical_expr::{Volatility, create_udf}; use datafusion::arrow::datatypes::DataType; -use std::sync::Arc; let udf = create_udf( "add_one", @@ -178,12 +288,7 @@ let udf = create_udf( ); ``` -[`scalarudf`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/struct.ScalarUDF.html -[`create_udf`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/fn.create_udf.html -[`process_scalar_func_inputs`]: https://docs.rs/datafusion/latest/datafusion/physical_expr/functions/fn.process_scalar_func_inputs.html -[`advanced_udf.rs`]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/advanced_udf.rs - -A few things to note: +A few things to note on `create_udf`: - The first argument is the name of the function. This is the name that will be used in SQL queries. - The second argument is a vector of `DataType`s. This is the list of argument types that the function accepts. I.e. in @@ -198,20 +303,51 @@ A few things to note: That gives us a `ScalarUDF` that we can register with the `SessionContext`: ```rust +# use std::sync::Arc; +# use datafusion::arrow::array::{ArrayRef, Int64Array}; +# use datafusion::common::cast::as_int64_array; +# use datafusion::common::Result; +# use datafusion::logical_expr::ColumnarValue; +# +# pub fn add_one(args: &[ColumnarValue]) -> Result { +# // Error handling omitted for brevity +# let args = ColumnarValue::values_to_arrays(args)?; +# let i64s = as_int64_array(&args[0])?; +# +# let new_array = i64s +# .iter() +# .map(|array_elem| array_elem.map(|value| value + 1)) +# .collect::(); +# +# Ok(ColumnarValue::from(Arc::new(new_array) as ArrayRef)) +# } +use datafusion::logical_expr::{Volatility, create_udf}; +use datafusion::arrow::datatypes::DataType; use datafusion::execution::context::SessionContext; -let mut ctx = SessionContext::new(); - -ctx.register_udf(udf); +#[tokio::main] +async fn main() { + let udf = create_udf( + "add_one", + vec![DataType::Int64], + DataType::Int64, + Volatility::Immutable, + Arc::new(add_one), + ); + + let mut ctx = SessionContext::new(); + ctx.register_udf(udf); + + // At this point, you can use the `add_one` function in your query: + let query = "SELECT add_one(1)"; + let df = ctx.sql(&query).await.unwrap(); +} ``` -At this point, you can use the `add_one` function in your query: - -```rust -let sql = "SELECT add_one(1)"; - -let df = ctx.sql( & sql).await.unwrap(); -``` +[`scalarudf`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/struct.ScalarUDF.html +[`create_udf`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/fn.create_udf.html +[`process_scalar_func_inputs`]: https://docs.rs/datafusion/latest/datafusion/physical_expr/functions/fn.process_scalar_func_inputs.html +[`advanced_udf.rs`]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/advanced_udf.rs ## Adding a Window UDF @@ -294,17 +430,61 @@ with the `SessionContext`. DataFusion provides the [`create_udwf`] helper functi There is a lower level API with more functionality but is more complex, that is documented in [`advanced_udwf.rs`]. ```rust +# use datafusion::arrow::{array::{ArrayRef, Float64Array, AsArray}, datatypes::Float64Type}; +# use datafusion::logical_expr::{PartitionEvaluator}; +# use datafusion::common::ScalarValue; +# use datafusion::error::Result; +# +# #[derive(Clone, Debug)] +# struct MyPartitionEvaluator {} +# +# impl MyPartitionEvaluator { +# fn new() -> Self { +# Self {} +# } +# } +# +# impl PartitionEvaluator for MyPartitionEvaluator { +# fn uses_window_frame(&self) -> bool { +# true +# } +# +# fn evaluate( +# &mut self, +# values: &[ArrayRef], +# range: &std::ops::Range, +# ) -> Result { +# // Again, the input argument is an array of floating +# // point numbers to calculate a moving average +# let arr: &Float64Array = values[0].as_ref().as_primitive::(); +# +# let range_len = range.end - range.start; +# +# // our smoothing function will average all the values in the +# let output = if range_len > 0 { +# let sum: f64 = arr.values().iter().skip(range.start).take(range_len).sum(); +# Some(sum / range_len as f64) +# } else { +# None +# }; +# +# Ok(ScalarValue::Float64(output)) +# } +# } +# fn make_partition_evaluator() -> Result> { +# Ok(Box::new(MyPartitionEvaluator::new())) +# } use datafusion::logical_expr::{Volatility, create_udwf}; use datafusion::arrow::datatypes::DataType; use std::sync::Arc; // here is where we define the UDWF. We also declare its signature: let smooth_it = create_udwf( -"smooth_it", -DataType::Float64, -Arc::new(DataType::Float64), -Volatility::Immutable, -Arc::new(make_partition_evaluator), + "smooth_it", + DataType::Float64, + Arc::new(DataType::Float64), + Volatility::Immutable, + Arc::new(make_partition_evaluator), ); ``` @@ -327,6 +507,62 @@ The `create_udwf` has five arguments to check: That gives us a `WindowUDF` that we can register with the `SessionContext`: ```rust +# use datafusion::arrow::{array::{ArrayRef, Float64Array, AsArray}, datatypes::Float64Type}; +# use datafusion::logical_expr::{PartitionEvaluator}; +# use datafusion::common::ScalarValue; +# use datafusion::error::Result; +# +# #[derive(Clone, Debug)] +# struct MyPartitionEvaluator {} +# +# impl MyPartitionEvaluator { +# fn new() -> Self { +# Self {} +# } +# } +# +# impl PartitionEvaluator for MyPartitionEvaluator { +# fn uses_window_frame(&self) -> bool { +# true +# } +# +# fn evaluate( +# &mut self, +# values: &[ArrayRef], +# range: &std::ops::Range, +# ) -> Result { +# // Again, the input argument is an array of floating +# // point numbers to calculate a moving average +# let arr: &Float64Array = values[0].as_ref().as_primitive::(); +# +# let range_len = range.end - range.start; +# +# // our smoothing function will average all the values in the +# let output = if range_len > 0 { +# let sum: f64 = arr.values().iter().skip(range.start).take(range_len).sum(); +# Some(sum / range_len as f64) +# } else { +# None +# }; +# +# Ok(ScalarValue::Float64(output)) +# } +# } +# fn make_partition_evaluator() -> Result> { +# Ok(Box::new(MyPartitionEvaluator::new())) +# } +# use datafusion::logical_expr::{Volatility, create_udwf}; +# use datafusion::arrow::datatypes::DataType; +# use std::sync::Arc; +# +# // here is where we define the UDWF. We also declare its signature: +# let smooth_it = create_udwf( +# "smooth_it", +# DataType::Float64, +# Arc::new(DataType::Float64), +# Volatility::Immutable, +# Arc::new(make_partition_evaluator), +# ); use datafusion::execution::context::SessionContext; let ctx = SessionContext::new(); @@ -336,10 +572,9 @@ ctx.register_udwf(smooth_it); At this point, you can use the `smooth_it` function in your query: -For example, if we have a [ -`cars.csv`](https://github.com/apache/datafusion/blob/main/datafusion/core/tests/data/cars.csv) whose contents like +For example, if we have a [`cars.csv`](https://github.com/apache/datafusion/blob/main/datafusion/core/tests/data/cars.csv) whose contents like -``` +```csv car,speed,time red,20.0,1996-04-12T12:05:03.000000000 red,20.3,1996-04-12T12:05:04.000000000 @@ -351,30 +586,97 @@ green,10.3,1996-04-12T12:05:04.000000000 Then, we can query like below: ```rust +# use datafusion::arrow::{array::{ArrayRef, Float64Array, AsArray}, datatypes::Float64Type}; +# use datafusion::logical_expr::{PartitionEvaluator}; +# use datafusion::common::ScalarValue; +# use datafusion::error::Result; +# +# #[derive(Clone, Debug)] +# struct MyPartitionEvaluator {} +# +# impl MyPartitionEvaluator { +# fn new() -> Self { +# Self {} +# } +# } +# +# impl PartitionEvaluator for MyPartitionEvaluator { +# fn uses_window_frame(&self) -> bool { +# true +# } +# +# fn evaluate( +# &mut self, +# values: &[ArrayRef], +# range: &std::ops::Range, +# ) -> Result { +# // Again, the input argument is an array of floating +# // point numbers to calculate a moving average +# let arr: &Float64Array = values[0].as_ref().as_primitive::(); +# +# let range_len = range.end - range.start; +# +# // our smoothing function will average all the values in the +# let output = if range_len > 0 { +# let sum: f64 = arr.values().iter().skip(range.start).take(range_len).sum(); +# Some(sum / range_len as f64) +# } else { +# None +# }; +# +# Ok(ScalarValue::Float64(output)) +# } +# } +# fn make_partition_evaluator() -> Result> { +# Ok(Box::new(MyPartitionEvaluator::new())) +# } +# use datafusion::logical_expr::{Volatility, create_udwf}; +# use datafusion::arrow::datatypes::DataType; +# use std::sync::Arc; +# use datafusion::execution::context::SessionContext; + use datafusion::datasource::file_format::options::CsvReadOptions; -// register csv table first -let csv_path = "cars.csv".to_string(); -ctx.register_csv("cars", & csv_path, CsvReadOptions::default ().has_header(true)).await?; -// do query with smooth_it -let df = ctx -.sql( -"SELECT \ - car, \ - speed, \ - smooth_it(speed) OVER (PARTITION BY car ORDER BY time) as smooth_speed,\ - time \ - from cars \ - ORDER BY \ - car", -) -.await?; -// print the results -df.show().await?; + +#[tokio::main] +async fn main() -> Result<()> { + + let ctx = SessionContext::new(); + + let smooth_it = create_udwf( + "smooth_it", + DataType::Float64, + Arc::new(DataType::Float64), + Volatility::Immutable, + Arc::new(make_partition_evaluator), + ); + ctx.register_udwf(smooth_it); + + // register csv table first + let csv_path = "../../datafusion/core/tests/data/cars.csv".to_string(); + ctx.register_csv("cars", &csv_path, CsvReadOptions::default().has_header(true)).await?; + + // do query with smooth_it + let df = ctx + .sql(r#" + SELECT + car, + speed, + smooth_it(speed) OVER (PARTITION BY car ORDER BY time) as smooth_speed, + time + FROM cars + ORDER BY car + "#) + .await?; + + // print the results + df.show().await?; + Ok(()) +} ``` -the output will be like: +The output will be like: -``` +```text +-------+-------+--------------------+---------------------+ | car | speed | smooth_speed | time | +-------+-------+--------------------+---------------------+ @@ -403,6 +705,7 @@ Aggregate UDFs are functions that take a group of rows and return a single value For example, we will declare a single-type, single return type UDAF that computes the geometric mean. ```rust + use datafusion::arrow::array::ArrayRef; use datafusion::scalar::ScalarValue; use datafusion::{error::Result, physical_plan::Accumulator}; @@ -427,7 +730,7 @@ impl Accumulator for GeometricMean { // This function serializes our state to `ScalarValue`, which DataFusion uses // to pass this state between execution stages. // Note that this can be arbitrary data. - fn state(&self) -> Result> { + fn state(&mut self) -> Result> { Ok(vec![ ScalarValue::from(self.prod), ScalarValue::from(self.n), @@ -436,7 +739,7 @@ impl Accumulator for GeometricMean { // DataFusion expects this function to return the final value of this aggregator. // in this case, this is the formula of the geometric mean - fn evaluate(&self) -> Result { + fn evaluate(&mut self) -> Result { let value = self.prod.powf(1.0 / self.n as f64); Ok(ScalarValue::from(value)) } @@ -491,37 +794,106 @@ impl Accumulator for GeometricMean { } ``` -### registering an Aggregate UDF +### Registering an Aggregate UDF To register a Aggregate UDF, you need to wrap the function implementation in a [`AggregateUDF`] struct and then register it with the `SessionContext`. DataFusion provides the [`create_udaf`] helper functions to make this easier. There is a lower level API with more functionality but is more complex, that is documented in [`advanced_udaf.rs`]. ```rust +# use datafusion::arrow::array::ArrayRef; +# use datafusion::scalar::ScalarValue; +# use datafusion::{error::Result, physical_plan::Accumulator}; +# +# #[derive(Debug)] +# struct GeometricMean { +# n: u32, +# prod: f64, +# } +# +# impl GeometricMean { +# pub fn new() -> Self { +# GeometricMean { n: 0, prod: 1.0 } +# } +# } +# +# impl Accumulator for GeometricMean { +# fn state(&mut self) -> Result> { +# Ok(vec![ +# ScalarValue::from(self.prod), +# ScalarValue::from(self.n), +# ]) +# } +# +# fn evaluate(&mut self) -> Result { +# let value = self.prod.powf(1.0 / self.n as f64); +# Ok(ScalarValue::from(value)) +# } +# +# fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { +# if values.is_empty() { +# return Ok(()); +# } +# let arr = &values[0]; +# (0..arr.len()).try_for_each(|index| { +# let v = ScalarValue::try_from_array(arr, index)?; +# +# if let ScalarValue::Float64(Some(value)) = v { +# self.prod *= value; +# self.n += 1; +# } else { +# unreachable!("") +# } +# Ok(()) +# }) +# } +# +# fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { +# if states.is_empty() { +# return Ok(()); +# } +# let arr = &states[0]; +# (0..arr.len()).try_for_each(|index| { +# let v = states +# .iter() +# .map(|array| ScalarValue::try_from_array(array, index)) +# .collect::>>()?; +# if let (ScalarValue::Float64(Some(prod)), ScalarValue::UInt32(Some(n))) = (&v[0], &v[1]) +# { +# self.prod *= prod; +# self.n += n; +# } else { +# unreachable!("") +# } +# Ok(()) +# }) +# } +# +# fn size(&self) -> usize { +# std::mem::size_of_val(self) +# } +# } + use datafusion::logical_expr::{Volatility, create_udaf}; use datafusion::arrow::datatypes::DataType; use std::sync::Arc; // here is where we define the UDAF. We also declare its signature: let geometric_mean = create_udaf( -// the name; used to represent it in plan descriptions and in the registry, to use in SQL. -"geo_mean", -// the input type; DataFusion guarantees that the first entry of `values` in `update` has this type. -vec![DataType::Float64], -// the return type; DataFusion expects this to match the type returned by `evaluate`. -Arc::new(DataType::Float64), -Volatility::Immutable, -// This is the accumulator factory; DataFusion uses it to create new accumulators. -Arc::new( | _ | Ok(Box::new(GeometricMean::new()))), -// This is the description of the state. `state()` must match the types here. -Arc::new(vec![DataType::Float64, DataType::UInt32]), + // the name; used to represent it in plan descriptions and in the registry, to use in SQL. + "geo_mean", + // the input type; DataFusion guarantees that the first entry of `values` in `update` has this type. + vec![DataType::Float64], + // the return type; DataFusion expects this to match the type returned by `evaluate`. + Arc::new(DataType::Float64), + Volatility::Immutable, + // This is the accumulator factory; DataFusion uses it to create new accumulators. + Arc::new( | _ | Ok(Box::new(GeometricMean::new()))), + // This is the description of the state. `state()` must match the types here. + Arc::new(vec![DataType::Float64, DataType::UInt32]), ); ``` -[`aggregateudf`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/struct.AggregateUDF.html -[`create_udaf`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/fn.create_udaf.html -[`advanced_udaf.rs`]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/advanced_udaf.rs - The `create_udaf` has six arguments to check: - The first argument is the name of the function. This is the name that will be used in SQL queries. @@ -535,22 +907,119 @@ The `create_udaf` has six arguments to check: - The fifth argument is the function implementation. This is the function that we defined above. - The sixth argument is the description of the state, which will by passed between execution stages. -That gives us a `AggregateUDF` that we can register with the `SessionContext`: - ```rust -use datafusion::execution::context::SessionContext; -let ctx = SessionContext::new(); +# use datafusion::arrow::array::ArrayRef; +# use datafusion::scalar::ScalarValue; +# use datafusion::{error::Result, physical_plan::Accumulator}; +# +# #[derive(Debug)] +# struct GeometricMean { +# n: u32, +# prod: f64, +# } +# +# impl GeometricMean { +# pub fn new() -> Self { +# GeometricMean { n: 0, prod: 1.0 } +# } +# } +# +# impl Accumulator for GeometricMean { +# fn state(&mut self) -> Result> { +# Ok(vec![ +# ScalarValue::from(self.prod), +# ScalarValue::from(self.n), +# ]) +# } +# +# fn evaluate(&mut self) -> Result { +# let value = self.prod.powf(1.0 / self.n as f64); +# Ok(ScalarValue::from(value)) +# } +# +# fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { +# if values.is_empty() { +# return Ok(()); +# } +# let arr = &values[0]; +# (0..arr.len()).try_for_each(|index| { +# let v = ScalarValue::try_from_array(arr, index)?; +# +# if let ScalarValue::Float64(Some(value)) = v { +# self.prod *= value; +# self.n += 1; +# } else { +# unreachable!("") +# } +# Ok(()) +# }) +# } +# +# fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { +# if states.is_empty() { +# return Ok(()); +# } +# let arr = &states[0]; +# (0..arr.len()).try_for_each(|index| { +# let v = states +# .iter() +# .map(|array| ScalarValue::try_from_array(array, index)) +# .collect::>>()?; +# if let (ScalarValue::Float64(Some(prod)), ScalarValue::UInt32(Some(n))) = (&v[0], &v[1]) +# { +# self.prod *= prod; +# self.n += n; +# } else { +# unreachable!("") +# } +# Ok(()) +# }) +# } +# +# fn size(&self) -> usize { +# std::mem::size_of_val(self) +# } +# } -ctx.register_udaf(geometric_mean); -``` +use datafusion::logical_expr::{Volatility, create_udaf}; +use datafusion::arrow::datatypes::DataType; +use std::sync::Arc; +use datafusion::execution::context::SessionContext; +use datafusion::datasource::file_format::options::CsvReadOptions; -Then, we can query like below: +#[tokio::main] +async fn main() -> Result<()> { + let geometric_mean = create_udaf( + "geo_mean", + vec![DataType::Float64], + Arc::new(DataType::Float64), + Volatility::Immutable, + Arc::new( | _ | Ok(Box::new(GeometricMean::new()))), + Arc::new(vec![DataType::Float64, DataType::UInt32]), + ); + + // That gives us a `AggregateUDF` that we can register with the `SessionContext`: + use datafusion::execution::context::SessionContext; + + let ctx = SessionContext::new(); + ctx.register_udaf(geometric_mean); + + // register csv table first + let csv_path = "../../datafusion/core/tests/data/cars.csv".to_string(); + ctx.register_csv("cars", &csv_path, CsvReadOptions::default().has_header(true)).await?; + + // Then, we can query like below: + let df = ctx.sql("SELECT geo_mean(speed) FROM cars").await?; + Ok(()) +} -```rust -let df = ctx.sql("SELECT geo_mean(a) FROM t").await?; ``` +[`aggregateudf`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/struct.AggregateUDF.html +[`create_udaf`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/fn.create_udaf.html +[`advanced_udaf.rs`]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/advanced_udaf.rs + ## Adding a User-Defined Table Function A User-Defined Table Function (UDTF) is a function that takes parameters and returns a `TableProvider`. @@ -592,12 +1061,17 @@ In the `call` method, you parse the input `Expr`s and return a `TableProvider`. validation of the input `Expr`s, e.g. checking that the number of arguments is correct. ```rust -use datafusion::common::plan_err; -use datafusion::datasource::function::TableFunctionImpl; -// Other imports here +use std::sync::Arc; +use datafusion::common::{plan_err, ScalarValue, Result}; +use datafusion::catalog::{TableFunctionImpl, TableProvider}; +use datafusion::arrow::array::{ArrayRef, Int64Array}; +use datafusion::datasource::memory::MemTable; +use arrow::record_batch::RecordBatch; +use arrow::datatypes::{DataType, Field, Schema}; +use datafusion_expr::Expr; /// A table function that returns a table provider with the value as a single column -#[derive(Default)] +#[derive(Debug)] pub struct EchoFunction {} impl TableFunctionImpl for EchoFunction { @@ -628,22 +1102,57 @@ impl TableFunctionImpl for EchoFunction { With the UDTF implemented, you can register it with the `SessionContext`: ```rust +# use std::sync::Arc; +# use datafusion::common::{plan_err, ScalarValue, Result}; +# use datafusion::catalog::{TableFunctionImpl, TableProvider}; +# use datafusion::arrow::array::{ArrayRef, Int64Array}; +# use datafusion::datasource::memory::MemTable; +# use arrow::record_batch::RecordBatch; +# use arrow::datatypes::{DataType, Field, Schema}; +# use datafusion_expr::Expr; +# +# /// A table function that returns a table provider with the value as a single column +# #[derive(Debug, Default)] +# pub struct EchoFunction {} +# +# impl TableFunctionImpl for EchoFunction { +# fn call(&self, exprs: &[Expr]) -> Result> { +# let Some(Expr::Literal(ScalarValue::Int64(Some(value)))) = exprs.get(0) else { +# return plan_err!("First argument must be an integer"); +# }; +# +# // Create the schema for the table +# let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)])); +# +# // Create a single RecordBatch with the value as a single column +# let batch = RecordBatch::try_new( +# schema.clone(), +# vec![Arc::new(Int64Array::from(vec![*value]))], +# )?; +# +# // Create a MemTable plan that returns the RecordBatch +# let provider = MemTable::try_new(schema, vec![vec![batch]])?; +# +# Ok(Arc::new(provider)) +# } +# } + use datafusion::execution::context::SessionContext; +use datafusion::arrow::util::pretty; -let ctx = SessionContext::new(); +#[tokio::main] +async fn main() -> Result<()> { + let ctx = SessionContext::new(); -ctx.register_udtf("echo", Arc::new(EchoFunction::default ())); -``` + ctx.register_udtf("echo", Arc::new(EchoFunction::default())); -And if all goes well, you can use it in your query: + // And if all goes well, you can use it in your query: -```rust -use datafusion::arrow::util::pretty; - -let df = ctx.sql("SELECT * FROM echo(1)").await?; + let results = ctx.sql("SELECT * FROM echo(1)").await?.collect().await?; + pretty::print_batches(&results)?; + Ok(()) +} -let results = df.collect().await?; -pretty::print_batches( & results) ?; // +---+ // | a | // +---+ diff --git a/docs/source/library-user-guide/api-health.md b/docs/source/library-user-guide/api-health.md index b9c6de370e55..87d3754b21a7 100644 --- a/docs/source/library-user-guide/api-health.md +++ b/docs/source/library-user-guide/api-health.md @@ -62,8 +62,8 @@ To mark the API as deprecated, use the `#[deprecated(since = "...", note = "..." For example: ```rust - #[deprecated(since = "41.0.0", note = "Use SessionStateBuilder")] - pub fn new_with_config_rt(config: SessionConfig, runtime: Arc) -> Self +#[deprecated(since = "41.0.0", note = "Use new API instead")] +pub fn api_to_deprecated(a: usize, b: usize) {} ``` Deprecated methods will remain in the codebase for a period of 6 major versions or 6 months, whichever is longer, to provide users ample time to transition away from them. diff --git a/docs/source/library-user-guide/catalogs.md b/docs/source/library-user-guide/catalogs.md index 13158d656423..906039ba2300 100644 --- a/docs/source/library-user-guide/catalogs.md +++ b/docs/source/library-user-guide/catalogs.md @@ -40,6 +40,11 @@ In the following example, we'll implement an in memory catalog, starting with th The `MemorySchemaProvider` is a simple implementation of the `SchemaProvider` trait. It stores state (i.e. tables) in a `DashMap`, which then underlies the `SchemaProvider` trait. ```rust +use std::sync::Arc; +use dashmap::DashMap; +use datafusion::catalog::{TableProvider, SchemaProvider}; + +#[derive(Debug)] pub struct MemorySchemaProvider { tables: DashMap>, } @@ -50,6 +55,20 @@ pub struct MemorySchemaProvider { Then we implement the `SchemaProvider` trait for `MemorySchemaProvider`. ```rust +# use std::sync::Arc; +# use dashmap::DashMap; +# use datafusion::catalog::TableProvider; +# +# #[derive(Debug)] +# pub struct MemorySchemaProvider { +# tables: DashMap>, +# } + +use std::any::Any; +use datafusion::catalog::SchemaProvider; +use async_trait::async_trait; +use datafusion::common::{Result, exec_err}; + #[async_trait] impl SchemaProvider for MemorySchemaProvider { fn as_any(&self) -> &dyn Any { @@ -63,8 +82,8 @@ impl SchemaProvider for MemorySchemaProvider { .collect() } - async fn table(&self, name: &str) -> Option> { - self.tables.get(name).map(|table| table.value().clone()) + async fn table(&self, name: &str) -> Result>> { + Ok(self.tables.get(name).map(|table| table.value().clone())) } fn register_table( @@ -93,12 +112,85 @@ impl SchemaProvider for MemorySchemaProvider { Without getting into a `CatalogProvider` implementation, we can create a `MemorySchemaProvider` and register `TableProvider`s with it. ```rust +# use std::sync::Arc; +# use dashmap::DashMap; +# use datafusion::catalog::TableProvider; +# +# #[derive(Debug)] +# pub struct MemorySchemaProvider { +# tables: DashMap>, +# } +# +# use std::any::Any; +# use datafusion::catalog::SchemaProvider; +# use async_trait::async_trait; +# use datafusion::common::{Result, exec_err}; +# +# #[async_trait] +# impl SchemaProvider for MemorySchemaProvider { +# fn as_any(&self) -> &dyn Any { +# self +# } +# +# fn table_names(&self) -> Vec { +# self.tables +# .iter() +# .map(|table| table.key().clone()) +# .collect() +# } +# +# async fn table(&self, name: &str) -> Result>> { +# Ok(self.tables.get(name).map(|table| table.value().clone())) +# } +# +# fn register_table( +# &self, +# name: String, +# table: Arc, +# ) -> Result>> { +# if self.table_exist(name.as_str()) { +# return exec_err!( +# "The table {name} already exists" +# ); +# } +# Ok(self.tables.insert(name, table)) +# } +# +# fn deregister_table(&self, name: &str) -> Result>> { +# Ok(self.tables.remove(name).map(|(_, table)| table)) +# } +# +# fn table_exist(&self, name: &str) -> bool { +# self.tables.contains_key(name) +# } +# } + +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use arrow::record_batch::RecordBatch; +use datafusion::datasource::MemTable; +use arrow::array::{self, Array, ArrayRef, Int32Array}; + +impl MemorySchemaProvider { + /// Instantiates a new MemorySchemaProvider with an empty collection of tables. + pub fn new() -> Self { + Self { + tables: DashMap::new(), + } + } +} + let schema_provider = Arc::new(MemorySchemaProvider::new()); -let table_provider = _; // create a table provider -schema_provider.register_table("table_name".to_string(), table_provider); +let table_provider = { + let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)])); + let arr = Arc::new(Int32Array::from((1..=1).collect::>())); + let partitions = vec![vec![RecordBatch::try_new(schema.clone(), vec![arr as ArrayRef]).unwrap()]]; + Arc::new(MemTable::try_new(schema, partitions).unwrap()) +}; + +schema_provider.register_table("users".to_string(), table_provider); -let table = schema_provider.table("table_name").unwrap(); +let table = schema_provider.table("users"); ``` ### Asynchronous `SchemaProvider` @@ -108,27 +200,44 @@ It's often useful to fetch metadata about which tables are in a schema, from a r The trait is roughly the same except for the `table` method, and the addition of the `#[async_trait]` attribute. ```rust +# use async_trait::async_trait; +# use std::sync::Arc; +# use datafusion::catalog::{TableProvider, SchemaProvider}; +# use datafusion::common::Result; +# +# type OriginSchema = arrow::datatypes::Schema; +# +# #[derive(Debug)] +# struct Schema(OriginSchema); + #[async_trait] impl SchemaProvider for Schema { - async fn table(&self, name: &str) -> Option> { - // fetch metadata from remote source + async fn table(&self, name: &str) -> Result>> { +# todo!(); } + +# fn as_any(&self) -> &(dyn std::any::Any + 'static) { todo!() } +# fn table_names(&self) -> Vec { todo!() } +# fn table_exist(&self, _: &str) -> bool { todo!() } } ``` ## Implementing `MemoryCatalogProvider` -As mentioned, the `CatalogProvider` can manage the schemas in a catalog, and the `MemoryCatalogProvider` is a simple implementation of the `CatalogProvider` trait. It stores schemas in a `DashMap`. +As mentioned, the `CatalogProvider` can manage the schemas in a catalog, and the `MemoryCatalogProvider` is a simple implementation of the `CatalogProvider` trait. It stores schemas in a `DashMap`. With that the `CatalogProvider` trait can be implemented. ```rust +use std::any::Any; +use std::sync::Arc; +use dashmap::DashMap; +use datafusion::catalog::{CatalogProvider, SchemaProvider}; +use datafusion::common::Result; + +#[derive(Debug)] pub struct MemoryCatalogProvider { schemas: DashMap>, } -``` - -With that the `CatalogProvider` trait can be implemented. -```rust impl CatalogProvider for MemoryCatalogProvider { fn as_any(&self) -> &dyn Any { self @@ -167,20 +276,24 @@ impl CatalogProvider for MemoryCatalogProvider { } ``` -Again, this is fairly straightforward, as there's an underlying data structure to store the state, via key-value pairs. +Again, this is fairly straightforward, as there's an underlying data structure to store the state, via key-value pairs. With that the `CatalogProviderList` trait can be implemented. ## Implementing `MemoryCatalogProviderList` ```rust + +use std::any::Any; +use std::sync::Arc; +use dashmap::DashMap; +use datafusion::catalog::{CatalogProviderList, CatalogProvider}; +use datafusion::common::Result; + +#[derive(Debug)] pub struct MemoryCatalogProviderList { /// Collection of catalogs containing schemas and ultimately TableProviders pub catalogs: DashMap>, } -``` -With that the `CatalogProviderList` trait can be implemented. - -```rust impl CatalogProviderList for MemoryCatalogProviderList { fn as_any(&self) -> &dyn Any { self diff --git a/docs/source/library-user-guide/custom-table-providers.md b/docs/source/library-user-guide/custom-table-providers.md index a7183fb3113e..886ac9629566 100644 --- a/docs/source/library-user-guide/custom-table-providers.md +++ b/docs/source/library-user-guide/custom-table-providers.md @@ -37,19 +37,84 @@ The `ExecutionPlan` trait at its core is a way to get a stream of batches. The a There are many different types of `SendableRecordBatchStream` implemented in DataFusion -- you can use a pre existing one, such as `MemoryStream` (if your `RecordBatch`es are all in memory) or implement your own custom logic, depending on your usecase. -Looking at the [example in this repo][ex], the execute method: +Looking at the full example below: ```rust +use std::any::Any; +use std::sync::{Arc, Mutex}; +use std::collections::{BTreeMap, HashMap}; +use datafusion::common::Result; +use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use datafusion::physical_plan::expressions::PhysicalSortExpr; +use datafusion::physical_plan::{ + ExecutionPlan, SendableRecordBatchStream, DisplayAs, DisplayFormatType, + Statistics, PlanProperties +}; +use datafusion::execution::context::TaskContext; +use datafusion::arrow::array::{UInt64Builder, UInt8Builder}; +use datafusion::physical_plan::memory::MemoryStream; +use datafusion::arrow::record_batch::RecordBatch; + +/// A User, with an id and a bank account +#[derive(Clone, Debug)] +struct User { + id: u8, + bank_account: u64, +} + +/// A custom datasource, used to represent a datastore with a single index +#[derive(Clone, Debug)] +pub struct CustomDataSource { + inner: Arc>, +} + +#[derive(Debug)] +struct CustomDataSourceInner { + data: HashMap, + bank_account_index: BTreeMap, +} + +#[derive(Debug)] struct CustomExec { db: CustomDataSource, projected_schema: SchemaRef, } +impl DisplayAs for CustomExec { + fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "CustomExec") + } +} + impl ExecutionPlan for CustomExec { - fn name(&self) { + fn name(&self) -> &str { "CustomExec" } + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.projected_schema.clone() + } + + + fn properties(&self) -> &PlanProperties { + unreachable!() + } + + fn children(&self) -> Vec<&Arc> { + Vec::new() + } + + fn with_new_children( + self: Arc, + _: Vec>, + ) -> Result> { + Ok(self) + } + fn execute( &self, _partition: usize, @@ -83,7 +148,7 @@ impl ExecutionPlan for CustomExec { } ``` -This: +This `execute` method: 1. Gets the users from the database 2. Constructs the individual output arrays (columns) @@ -98,6 +163,134 @@ With the `ExecutionPlan` implemented, we can now implement the `scan` method of The `scan` method of the `TableProvider` returns a `Result>`. We can use the `Arc` to return a reference-counted pointer to the `ExecutionPlan` we implemented. In the example, this is done by: ```rust + +# use std::any::Any; +# use std::sync::{Arc, Mutex}; +# use std::collections::{BTreeMap, HashMap}; +# use datafusion::common::Result; +# use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +# use datafusion::physical_plan::expressions::PhysicalSortExpr; +# use datafusion::physical_plan::{ +# ExecutionPlan, SendableRecordBatchStream, DisplayAs, DisplayFormatType, +# Statistics, PlanProperties +# }; +# use datafusion::execution::context::TaskContext; +# use datafusion::arrow::array::{UInt64Builder, UInt8Builder}; +# use datafusion::physical_plan::memory::MemoryStream; +# use datafusion::arrow::record_batch::RecordBatch; +# +# /// A User, with an id and a bank account +# #[derive(Clone, Debug)] +# struct User { +# id: u8, +# bank_account: u64, +# } +# +# /// A custom datasource, used to represent a datastore with a single index +# #[derive(Clone, Debug)] +# pub struct CustomDataSource { +# inner: Arc>, +# } +# +# #[derive(Debug)] +# struct CustomDataSourceInner { +# data: HashMap, +# bank_account_index: BTreeMap, +# } +# +# #[derive(Debug)] +# struct CustomExec { +# db: CustomDataSource, +# projected_schema: SchemaRef, +# } +# +# impl DisplayAs for CustomExec { +# fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result { +# write!(f, "CustomExec") +# } +# } +# +# impl ExecutionPlan for CustomExec { +# fn name(&self) -> &str { +# "CustomExec" +# } +# +# fn as_any(&self) -> &dyn Any { +# self +# } +# +# fn schema(&self) -> SchemaRef { +# self.projected_schema.clone() +# } +# +# +# fn properties(&self) -> &PlanProperties { +# unreachable!() +# } +# +# fn children(&self) -> Vec<&Arc> { +# Vec::new() +# } +# +# fn with_new_children( +# self: Arc, +# _: Vec>, +# ) -> Result> { +# Ok(self) +# } +# +# fn execute( +# &self, +# _partition: usize, +# _context: Arc, +# ) -> Result { +# let users: Vec = { +# let db = self.db.inner.lock().unwrap(); +# db.data.values().cloned().collect() +# }; +# +# let mut id_array = UInt8Builder::with_capacity(users.len()); +# let mut account_array = UInt64Builder::with_capacity(users.len()); +# +# for user in users { +# id_array.append_value(user.id); +# account_array.append_value(user.bank_account); +# } +# +# Ok(Box::pin(MemoryStream::try_new( +# vec![RecordBatch::try_new( +# self.projected_schema.clone(), +# vec![ +# Arc::new(id_array.finish()), +# Arc::new(account_array.finish()), +# ], +# )?], +# self.schema(), +# None, +# )?)) +# } +# } + +use async_trait::async_trait; +use datafusion::logical_expr::expr::Expr; +use datafusion::datasource::{TableProvider, TableType}; +use datafusion::physical_plan::project_schema; +use datafusion::catalog::Session; + +impl CustomExec { + fn new( + projections: Option<&Vec>, + schema: SchemaRef, + db: CustomDataSource, + ) -> Self { + let projected_schema = project_schema(&schema, projections).unwrap(); + Self { + db, + projected_schema, + } + } +} + impl CustomDataSource { pub(crate) async fn create_physical_plan( &self, @@ -110,6 +303,21 @@ impl CustomDataSource { #[async_trait] impl TableProvider for CustomDataSource { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + SchemaRef::new(Schema::new(vec![ + Field::new("id", DataType::UInt8, false), + Field::new("bank_account", DataType::UInt64, true), + ])) + } + + fn table_type(&self) -> TableType { + TableType::Base + } + async fn scan( &self, _state: &dyn Session, @@ -145,17 +353,194 @@ For filters that can be pushed down, they'll be passed to the `scan` method as t In order to use the custom table provider, we need to register it with DataFusion. This is done by creating a `TableProvider` and registering it with the `SessionContext`. -```rust -let ctx = SessionContext::new(); - -let custom_table_provider = CustomDataSource::new(); -ctx.register_table("custom_table", Arc::new(custom_table_provider)); -``` - This will allow you to use the custom table provider in DataFusion. For example, you could use it in a SQL query to get a `DataFrame`. ```rust -let df = ctx.sql("SELECT id, bank_account FROM custom_table")?; +# use std::any::Any; +# use std::sync::{Arc, Mutex}; +# use std::collections::{BTreeMap, HashMap}; +# use datafusion::common::Result; +# use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +# use datafusion::physical_plan::expressions::PhysicalSortExpr; +# use datafusion::physical_plan::{ +# ExecutionPlan, SendableRecordBatchStream, DisplayAs, DisplayFormatType, +# Statistics, PlanProperties +# }; +# use datafusion::execution::context::TaskContext; +# use datafusion::arrow::array::{UInt64Builder, UInt8Builder}; +# use datafusion::physical_plan::memory::MemoryStream; +# use datafusion::arrow::record_batch::RecordBatch; +# +# /// A User, with an id and a bank account +# #[derive(Clone, Debug)] +# struct User { +# id: u8, +# bank_account: u64, +# } +# +# /// A custom datasource, used to represent a datastore with a single index +# #[derive(Clone, Debug)] +# pub struct CustomDataSource { +# inner: Arc>, +# } +# +# #[derive(Debug)] +# struct CustomDataSourceInner { +# data: HashMap, +# bank_account_index: BTreeMap, +# } +# +# #[derive(Debug)] +# struct CustomExec { +# db: CustomDataSource, +# projected_schema: SchemaRef, +# } +# +# impl DisplayAs for CustomExec { +# fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result { +# write!(f, "CustomExec") +# } +# } +# +# impl ExecutionPlan for CustomExec { +# fn name(&self) -> &str { +# "CustomExec" +# } +# +# fn as_any(&self) -> &dyn Any { +# self +# } +# +# fn schema(&self) -> SchemaRef { +# self.projected_schema.clone() +# } +# +# +# fn properties(&self) -> &PlanProperties { +# unreachable!() +# } +# +# fn children(&self) -> Vec<&Arc> { +# Vec::new() +# } +# +# fn with_new_children( +# self: Arc, +# _: Vec>, +# ) -> Result> { +# Ok(self) +# } +# +# fn execute( +# &self, +# _partition: usize, +# _context: Arc, +# ) -> Result { +# let users: Vec = { +# let db = self.db.inner.lock().unwrap(); +# db.data.values().cloned().collect() +# }; +# +# let mut id_array = UInt8Builder::with_capacity(users.len()); +# let mut account_array = UInt64Builder::with_capacity(users.len()); +# +# for user in users { +# id_array.append_value(user.id); +# account_array.append_value(user.bank_account); +# } +# +# Ok(Box::pin(MemoryStream::try_new( +# vec![RecordBatch::try_new( +# self.projected_schema.clone(), +# vec![ +# Arc::new(id_array.finish()), +# Arc::new(account_array.finish()), +# ], +# )?], +# self.schema(), +# None, +# )?)) +# } +# } + +# use async_trait::async_trait; +# use datafusion::logical_expr::expr::Expr; +# use datafusion::datasource::{TableProvider, TableType}; +# use datafusion::physical_plan::project_schema; +# use datafusion::catalog::Session; +# +# impl CustomExec { +# fn new( +# projections: Option<&Vec>, +# schema: SchemaRef, +# db: CustomDataSource, +# ) -> Self { +# let projected_schema = project_schema(&schema, projections).unwrap(); +# Self { +# db, +# projected_schema, +# } +# } +# } +# +# impl CustomDataSource { +# pub(crate) async fn create_physical_plan( +# &self, +# projections: Option<&Vec>, +# schema: SchemaRef, +# ) -> Result> { +# Ok(Arc::new(CustomExec::new(projections, schema, self.clone()))) +# } +# } +# +# #[async_trait] +# impl TableProvider for CustomDataSource { +# fn as_any(&self) -> &dyn Any { +# self +# } +# +# fn schema(&self) -> SchemaRef { +# SchemaRef::new(Schema::new(vec![ +# Field::new("id", DataType::UInt8, false), +# Field::new("bank_account", DataType::UInt64, true), +# ])) +# } +# +# fn table_type(&self) -> TableType { +# TableType::Base +# } +# +# async fn scan( +# &self, +# _state: &dyn Session, +# projection: Option<&Vec>, +# // filters and limit can be used here to inject some push-down operations if needed +# _filters: &[Expr], +# _limit: Option, +# ) -> Result> { +# return self.create_physical_plan(projection, self.schema()).await; +# } +# } + +use datafusion::execution::context::SessionContext; + +#[tokio::main] +async fn main() -> Result<()> { + let ctx = SessionContext::new(); + + let custom_table_provider = CustomDataSource { + inner: Arc::new(Mutex::new(CustomDataSourceInner { + data: Default::default(), + bank_account_index: Default::default(), + })), + }; + + ctx.register_table("customers", Arc::new(custom_table_provider)); + let df = ctx.sql("SELECT id, bank_account FROM customers").await?; + + Ok(()) +} + ``` ## Recap diff --git a/docs/source/library-user-guide/query-optimizer.md b/docs/source/library-user-guide/query-optimizer.md index c2c60af85f4c..fad8adf83d81 100644 --- a/docs/source/library-user-guide/query-optimizer.md +++ b/docs/source/library-user-guide/query-optimizer.md @@ -35,18 +35,28 @@ and applying it to a logical plan to produce an optimized logical plan. ```rust +use std::sync::Arc; +use datafusion::logical_expr::{col, lit, LogicalPlan, LogicalPlanBuilder}; +use datafusion::optimizer::{OptimizerRule, OptimizerContext, Optimizer}; + // We need a logical plan as the starting point. There are many ways to build a logical plan: // // The `datafusion-expr` crate provides a LogicalPlanBuilder // The `datafusion-sql` crate provides a SQL query planner that can create a LogicalPlan from SQL // The `datafusion` crate provides a DataFrame API that can create a LogicalPlan -let logical_plan = ... -let mut config = OptimizerContext::default(); -let optimizer = Optimizer::new(&config); -let optimized_plan = optimizer.optimize(&logical_plan, &config, observe)?; +let initial_logical_plan = LogicalPlanBuilder::empty(false).build().unwrap(); + +// use builtin rules or customized rules +let rules: Vec> = vec![]; + +let optimizer = Optimizer::with_rules(rules); + +let config = OptimizerContext::new().with_max_passes(16); -fn observe(plan: &LogicalPlan, rule: &dyn OptimizerRule) { +let optimized_plan = optimizer.optimize(initial_logical_plan.clone(), &config, observer); + +fn observer(plan: &LogicalPlan, rule: &dyn OptimizerRule) { println!( "After applying rule '{}':\n{}", rule.name(), @@ -55,16 +65,6 @@ fn observe(plan: &LogicalPlan, rule: &dyn OptimizerRule) { } ``` -## Providing Custom Rules - -The optimizer can be created with a custom set of rules. - -```rust -let optimizer = Optimizer::with_rules(vec![ - Arc::new(MyRule {}) -]); -``` - ## Writing Optimization Rules Please refer to the @@ -72,26 +72,71 @@ Please refer to the example to learn more about the general approach to writing optimizer rules and then move onto studying the existing rules. +`OptimizerRule` transforms one ['LogicalPlan'] into another which +computes the same results, but in a potentially more efficient +way. If there are no suitable transformations for the input plan, +the optimizer can simply return it as is. + All rules must implement the `OptimizerRule` trait. ```rust -/// `OptimizerRule` transforms one ['LogicalPlan'] into another which -/// computes the same results, but in a potentially more efficient -/// way. If there are no suitable transformations for the input plan, -/// the optimizer can simply return it as is. -pub trait OptimizerRule { - /// Rewrite `plan` to an optimized form - fn optimize( - &self, - plan: &LogicalPlan, - config: &dyn OptimizerConfig, - ) -> Result; +# use datafusion::common::tree_node::Transformed; +# use datafusion::common::Result; +# use datafusion::logical_expr::LogicalPlan; +# use datafusion::optimizer::{OptimizerConfig, OptimizerRule}; +# + +#[derive(Default, Debug)] +struct MyOptimizerRule {} + +impl OptimizerRule for MyOptimizerRule { + fn name(&self) -> &str { + "my_optimizer_rule" + } - /// A human readable name for this optimizer rule - fn name(&self) -> &str; + fn rewrite( + &self, + plan: LogicalPlan, + _config: &dyn OptimizerConfig, + ) -> Result> { + unimplemented!() + } } ``` +## Providing Custom Rules + +The optimizer can be created with a custom set of rules. + +```rust +# use std::sync::Arc; +# use datafusion::logical_expr::{col, lit, LogicalPlan, LogicalPlanBuilder}; +# use datafusion::optimizer::{OptimizerRule, OptimizerConfig, OptimizerContext, Optimizer}; +# use datafusion::common::tree_node::Transformed; +# use datafusion::common::Result; +# +# #[derive(Default, Debug)] +# struct MyOptimizerRule {} +# +# impl OptimizerRule for MyOptimizerRule { +# fn name(&self) -> &str { +# "my_optimizer_rule" +# } +# +# fn rewrite( +# &self, +# plan: LogicalPlan, +# _config: &dyn OptimizerConfig, +# ) -> Result> { +# unimplemented!() +# } +# } + +let optimizer = Optimizer::with_rules(vec![ + Arc::new(MyOptimizerRule {}) +]); +``` + ### General Guidelines Rules typical walk the logical plan and walk the expression trees inside operators and selectively mutate @@ -168,16 +213,19 @@ and [#3555](https://github.com/apache/datafusion/issues/3555) occur where the ex There are currently two ways to create a name for an expression in the logical plan. ```rust +# use datafusion::common::Result; +# struct Expr; + impl Expr { /// Returns the name of this expression as it should appear in a schema. This name /// will not include any CAST expressions. pub fn display_name(&self) -> Result { - create_name(self) + Ok("display_name".to_string()) } /// Returns a full and complete string representation of this expression. pub fn canonical_name(&self) -> String { - format!("{}", self) + "canonical_name".to_string() } } ``` @@ -187,93 +235,99 @@ name to be used in a schema, `display_name` should be used. ### Utilities -There are a number of utility methods provided that take care of some common tasks. +There are a number of [utility methods][util] provided that take care of some common tasks. -### ExprVisitor +[util]: https://github.com/apache/datafusion/blob/main/datafusion/expr/src/utils.rs -The `ExprVisitor` and `ExprVisitable` traits provide a mechanism for applying a visitor pattern to an expression tree. +### Recursively walk an expression tree -Here is an example that demonstrates this. +The [TreeNode API] provides a convenient way to recursively walk an expression or plan tree. -```rust -fn extract_subquery_filters(expression: &Expr, extracted: &mut Vec) -> Result<()> { - struct InSubqueryVisitor<'a> { - accum: &'a mut Vec, - } +For example, to find all subquery references in a logical plan, the following code can be used: - impl ExpressionVisitor for InSubqueryVisitor<'_> { - fn pre_visit(self, expr: &Expr) -> Result> { +```rust +# use datafusion::prelude::*; +# use datafusion::common::tree_node::{TreeNode, TreeNodeRecursion}; +# use datafusion::common::Result; +// Return all subquery references in an expression +fn extract_subquery_filters(expression: &Expr) -> Result> { + let mut extracted = vec![]; + expression.apply(|expr| { if let Expr::InSubquery(_) = expr { - self.accum.push(expr.to_owned()); + extracted.push(expr); } - Ok(Recursion::Continue(self)) - } - } + Ok(TreeNodeRecursion::Continue) + })?; + Ok(extracted) +} +``` - expression.accept(InSubqueryVisitor { accum: extracted })?; - Ok(()) +Likewise you can use the [TreeNode API] to rewrite a `LogicalPlan` or `ExecutionPlan` + +```rust +# use datafusion::prelude::*; +# use datafusion::logical_expr::{LogicalPlan, Join}; +# use datafusion::common::tree_node::{TreeNode, TreeNodeRecursion}; +# use datafusion::common::Result; +// Return all joins in a logical plan +fn find_joins(overall_plan: &LogicalPlan) -> Result> { + let mut extracted = vec![]; + overall_plan.apply(|plan| { + if let LogicalPlan::Join(join) = plan { + extracted.push(join); + } + Ok(TreeNodeRecursion::Continue) + })?; + Ok(extracted) } ``` -### Rewriting Expressions +### Rewriting expressions -The `MyExprRewriter` trait can be implemented to provide a way to rewrite expressions. This rule can then be applied -to an expression by calling `Expr::rewrite` (from the `ExprRewritable` trait). +The [TreeNode API] also provides a convenient way to rewrite expressions and +plans as well. For example to rewrite all expressions like -The `rewrite` method will perform a depth first walk of the expression and its children to rewrite an expression, -consuming `self` producing a new expression. +```sql +col BETWEEN x AND y +``` -```rust -let mut expr_rewriter = MyExprRewriter {}; -let expr = expr.rewrite(&mut expr_rewriter)?; +into + +```sql +col >= x AND col <= y ``` -Here is an example implementation which will rewrite `expr BETWEEN a AND b` as `expr >= a AND expr <= b`. Note that the -implementation does not need to perform any recursion since this is handled by the `rewrite` method. +you can use the following code: ```rust -struct MyExprRewriter {} - -impl ExprRewriter for MyExprRewriter { - fn mutate(&mut self, expr: Expr) -> Result { - match expr { - Expr::Between { +# use datafusion::prelude::*; +# use datafusion::logical_expr::{Between}; +# use datafusion::logical_expr::expr_fn::*; +# use datafusion::common::tree_node::{Transformed, TreeNode, TreeNodeRecursion}; +# use datafusion::common::Result; +// Recursively rewrite all BETWEEN expressions +// returns Transformed::yes if any changes were made +fn rewrite_between(expr: Expr) -> Result> { + // transform_up does a bottom up rewrite + expr.transform_up(|expr| { + // only handle BETWEEN expressions + let Expr::Between(Between { negated, expr, low, high, - } => { - let expr: Expr = expr.as_ref().clone(); - let low: Expr = low.as_ref().clone(); - let high: Expr = high.as_ref().clone(); - if negated { - Ok(expr.clone().lt(low).or(expr.clone().gt(high))) - } else { - Ok(expr.clone().gt_eq(low).and(expr.clone().lt_eq(high))) - } - } - _ => Ok(expr.clone()), - } - } -} -``` - -### optimize_children - -Typically a rule is applied recursively to all operators within a query plan. Rather than duplicate -that logic in each rule, an `optimize_children` method is provided. This recursively invokes the `optimize` method on -the plan's children and then returns a node of the same type. - -```rust -fn optimize( - &self, - plan: &LogicalPlan, - _config: &mut OptimizerConfig, -) -> Result { - // recurse down and optimize children first - let plan = utils::optimize_children(self, plan, _config)?; - - ... + }) = expr else { + return Ok(Transformed::no(expr)) + }; + let rewritten_expr = if negated { + // don't rewrite NOT BETWEEN + Expr::Between(Between::new(expr, negated, low, high)) + } else { + // rewrite to (expr >= low) AND (expr <= high) + expr.clone().gt_eq(*low).and(expr.lt_eq(*high)) + }; + Ok(Transformed::yes(rewritten_expr)) + }) } ``` diff --git a/docs/source/library-user-guide/working-with-exprs.md b/docs/source/library-user-guide/working-with-exprs.md index e0b6f434a032..1a6e9123086d 100644 --- a/docs/source/library-user-guide/working-with-exprs.md +++ b/docs/source/library-user-guide/working-with-exprs.md @@ -61,12 +61,34 @@ We'll use a `ScalarUDF` expression as our example. This necessitates implementin So assuming you've written that function, you can use it to create an `Expr`: ```rust +# use std::sync::Arc; +# use datafusion::arrow::array::{ArrayRef, Int64Array}; +# use datafusion::common::cast::as_int64_array; +# use datafusion::common::Result; +# use datafusion::logical_expr::ColumnarValue; +# +# pub fn add_one(args: &[ColumnarValue]) -> Result { +# // Error handling omitted for brevity +# let args = ColumnarValue::values_to_arrays(args)?; +# let i64s = as_int64_array(&args[0])?; +# +# let new_array = i64s +# .iter() +# .map(|array_elem| array_elem.map(|value| value + 1)) +# .collect::(); +# +# Ok(ColumnarValue::from(Arc::new(new_array) as ArrayRef)) +# } +use datafusion::logical_expr::{Volatility, create_udf}; +use datafusion::arrow::datatypes::DataType; +use datafusion::logical_expr::{col, lit}; + let add_one_udf = create_udf( "add_one", vec![DataType::Int64], - Arc::new(DataType::Int64), + DataType::Int64, Volatility::Immutable, - make_scalar_function(add_one), // <-- the function we wrote + Arc::new(add_one), ); // make the expr `add_one(5)` @@ -99,11 +121,16 @@ In our example, we'll use rewriting to update our `add_one` UDF, to be rewritten To implement the inlining, we'll need to write a function that takes an `Expr` and returns a `Result`. If the expression is _not_ to be rewritten `Transformed::no` is used to wrap the original `Expr`. If the expression _is_ to be rewritten, `Transformed::yes` is used to wrap the new `Expr`. ```rust -fn rewrite_add_one(expr: Expr) -> Result { +use datafusion::common::Result; +use datafusion::common::tree_node::{Transformed, TreeNode}; +use datafusion::logical_expr::{col, lit, Expr}; +use datafusion::logical_expr::{ScalarUDF}; + +fn rewrite_add_one(expr: Expr) -> Result> { expr.transform(&|expr| { Ok(match expr { - Expr::ScalarUDF(scalar_fun) if scalar_fun.fun.name == "add_one" => { - let input_arg = scalar_fun.args[0].clone(); + Expr::ScalarFunction(scalar_func) if scalar_func.func.inner().name() == "add_one" => { + let input_arg = scalar_func.args[0].clone(); let new_expression = input_arg + lit(1i64); Transformed::yes(new_expression) @@ -124,6 +151,27 @@ We'll call our rule `AddOneInliner` and implement the `OptimizerRule` trait. The - `try_optimize` - takes a `LogicalPlan` and returns an `Option`. If the rule is able to optimize the plan, it returns `Some(LogicalPlan)` with the optimized plan. If the rule is not able to optimize the plan, it returns `None`. ```rust +use std::sync::Arc; +use datafusion::common::Result; +use datafusion::common::tree_node::{Transformed, TreeNode}; +use datafusion::logical_expr::{col, lit, Expr, LogicalPlan, LogicalPlanBuilder}; +use datafusion::optimizer::{OptimizerRule, OptimizerConfig, OptimizerContext, Optimizer}; + +# fn rewrite_add_one(expr: Expr) -> Result> { +# expr.transform(&|expr| { +# Ok(match expr { +# Expr::ScalarFunction(scalar_func) if scalar_func.func.inner().name() == "add_one" => { +# let input_arg = scalar_func.args[0].clone(); +# let new_expression = input_arg + lit(1i64); +# +# Transformed::yes(new_expression) +# } +# _ => Transformed::no(expr), +# }) +# }) +# } + +#[derive(Default, Debug)] struct AddOneInliner {} impl OptimizerRule for AddOneInliner { @@ -131,23 +179,26 @@ impl OptimizerRule for AddOneInliner { "add_one_inliner" } - fn try_optimize( + fn rewrite( &self, - plan: &LogicalPlan, - config: &dyn OptimizerConfig, - ) -> Result> { + plan: LogicalPlan, + _config: &dyn OptimizerConfig, + ) -> Result> { // Map over the expressions and rewrite them - let new_expressions = plan + let new_expressions: Vec = plan .expressions() .into_iter() .map(|expr| rewrite_add_one(expr)) - .collect::>>()?; + .collect::>>()? // returns Vec> + .into_iter() + .map(|transformed| transformed.data) + .collect(); let inputs = plan.inputs().into_iter().cloned().collect::>(); - let plan = plan.with_new_exprs(&new_expressions, &inputs); + let plan: Result = plan.with_new_exprs(new_expressions, inputs); - plan.map(Some) + plan.map(|p| Transformed::yes(p)) } } ``` @@ -161,25 +212,111 @@ We're almost there. Let's just test our rule works properly. Testing the rule is fairly simple, we can create a SessionState with our rule and then create a DataFrame and run a query. The logical plan will be optimized by our rule. ```rust -use datafusion::prelude::*; - -let rules = Arc::new(AddOneInliner {}); -let state = ctx.state().with_optimizer_rules(vec![rules]); - -let ctx = SessionContext::with_state(state); -ctx.register_udf(add_one); - -let sql = "SELECT add_one(1) AS added_one"; -let plan = ctx.sql(sql).await?.logical_plan(); - -println!("{:?}", plan); +# use std::sync::Arc; +# use datafusion::common::Result; +# use datafusion::common::tree_node::{Transformed, TreeNode}; +# use datafusion::logical_expr::{col, lit, Expr, LogicalPlan, LogicalPlanBuilder}; +# use datafusion::optimizer::{OptimizerRule, OptimizerConfig, OptimizerContext, Optimizer}; +# use datafusion::arrow::array::{ArrayRef, Int64Array}; +# use datafusion::common::cast::as_int64_array; +# use datafusion::logical_expr::ColumnarValue; +# use datafusion::logical_expr::{Volatility, create_udf}; +# use datafusion::arrow::datatypes::DataType; +# +# fn rewrite_add_one(expr: Expr) -> Result> { +# expr.transform(&|expr| { +# Ok(match expr { +# Expr::ScalarFunction(scalar_func) if scalar_func.func.inner().name() == "add_one" => { +# let input_arg = scalar_func.args[0].clone(); +# let new_expression = input_arg + lit(1i64); +# +# Transformed::yes(new_expression) +# } +# _ => Transformed::no(expr), +# }) +# }) +# } +# +# #[derive(Default, Debug)] +# struct AddOneInliner {} +# +# impl OptimizerRule for AddOneInliner { +# fn name(&self) -> &str { +# "add_one_inliner" +# } +# +# fn rewrite( +# &self, +# plan: LogicalPlan, +# _config: &dyn OptimizerConfig, +# ) -> Result> { +# // Map over the expressions and rewrite them +# let new_expressions: Vec = plan +# .expressions() +# .into_iter() +# .map(|expr| rewrite_add_one(expr)) +# .collect::>>()? // returns Vec> +# .into_iter() +# .map(|transformed| transformed.data) +# .collect(); +# +# let inputs = plan.inputs().into_iter().cloned().collect::>(); +# +# let plan: Result = plan.with_new_exprs(new_expressions, inputs); +# +# plan.map(|p| Transformed::yes(p)) +# } +# } +# +# pub fn add_one(args: &[ColumnarValue]) -> Result { +# // Error handling omitted for brevity +# let args = ColumnarValue::values_to_arrays(args)?; +# let i64s = as_int64_array(&args[0])?; +# +# let new_array = i64s +# .iter() +# .map(|array_elem| array_elem.map(|value| value + 1)) +# .collect::(); +# +# Ok(ColumnarValue::from(Arc::new(new_array) as ArrayRef)) +# } + +use datafusion::execution::context::SessionContext; + +#[tokio::main] +async fn main() -> Result<()> { + + let ctx = SessionContext::new(); + // ctx.add_optimizer_rule(Arc::new(AddOneInliner {})); + + let add_one_udf = create_udf( + "add_one", + vec![DataType::Int64], + DataType::Int64, + Volatility::Immutable, + Arc::new(add_one), + ); + ctx.register_udf(add_one_udf); + + let sql = "SELECT add_one(5) AS added_one"; + // let plan = ctx.sql(sql).await?.into_unoptimized_plan().clone(); + let plan = ctx.sql(sql).await?.into_optimized_plan()?.clone(); + + let expected = r#"Projection: Int64(6) AS added_one + EmptyRelation"#; + + assert_eq!(plan.to_string(), expected); + + Ok(()) +} ``` -This results in the following output: +This plan is optimized as: ```text -Projection: Int64(1) + Int64(1) AS added_one - EmptyRelation +Projection: add_one(Int64(5)) AS added_one + -> Projection: Int64(5) + Int64(1) AS added_one + -> Projection: Int64(6) AS added_one ``` I.e. the `add_one` UDF has been inlined into the projection. @@ -189,27 +326,23 @@ I.e. the `add_one` UDF has been inlined into the projection. The `arrow::datatypes::DataType` of the expression can be obtained by calling the `get_type` given something that implements `Expr::Schemable`, for example a `DFschema` object: ```rust -use arrow_schema::DataType; -use datafusion::common::{DFField, DFSchema}; +use arrow::datatypes::{DataType, Field}; +use datafusion::common::DFSchema; use datafusion::logical_expr::{col, ExprSchemable}; use std::collections::HashMap; +// Get the type of an expression that adds 2 columns. Adding an Int32 +// and Float32 results in Float32 type let expr = col("c1") + col("c2"); -let schema = DFSchema::new_with_metadata( +let schema = DFSchema::from_unqualified_fields( vec![ - DFField::new_unqualified("c1", DataType::Int32, true), - DFField::new_unqualified("c2", DataType::Float32, true), - ], + Field::new("c1", DataType::Int32, true), + Field::new("c2", DataType::Float32, true), + ] + .into(), HashMap::new(), -) -.unwrap(); -print!("type = {}", expr.get_type(&schema).unwrap()); -``` - -This results in the following output: - -```text -type = Float32 +).unwrap(); +assert_eq!("Float32", format!("{}", expr.get_type(&schema).unwrap())); ``` ## Conclusion diff --git a/docs/source/user-guide/cli/usage.md b/docs/source/user-guide/cli/usage.md index 6a620fc69252..fb238dad10bb 100644 --- a/docs/source/user-guide/cli/usage.md +++ b/docs/source/user-guide/cli/usage.md @@ -127,7 +127,7 @@ supports additional statements and commands: Show configuration options -```SQL +```sql > show all; +-------------------------------------------------+---------+ @@ -163,7 +163,7 @@ Show specific configuration option - Set configuration options -```SQL +```sql > SET datafusion.execution.batch_size to 1024; ``` diff --git a/docs/source/user-guide/crate-configuration.md b/docs/source/user-guide/crate-configuration.md index 9d22e3403097..f4a1910f5f78 100644 --- a/docs/source/user-guide/crate-configuration.md +++ b/docs/source/user-guide/crate-configuration.md @@ -68,7 +68,9 @@ codegen-units = 1 Then, in `main.rs.` update the memory allocator with the below after your imports: -```rust ,ignore + + +```no-run use datafusion::prelude::*; #[global_allocator] diff --git a/docs/source/user-guide/explain-usage.md b/docs/source/user-guide/explain-usage.md index 32a87ae9198d..d89ed5f0e7ea 100644 --- a/docs/source/user-guide/explain-usage.md +++ b/docs/source/user-guide/explain-usage.md @@ -49,7 +49,7 @@ LIMIT 5; The output will look like -``` +```text +---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | plan_type | plan | +---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ @@ -70,7 +70,7 @@ Elapsed 0.060 seconds. There are two sections: logical plan and physical plan -- **Logical Plan:** is a plan generated for a specific SQL query, DataFrame, or other language without the +- **Logical Plan:** is a plan generated for a specific SQL query, DataFrame, or other language without the knowledge of the underlying data organization. - **Physical Plan:** is a plan generated from a logical plan along with consideration of the hardware configuration (e.g number of CPUs) and the underlying data organization (e.g number of files). @@ -87,7 +87,7 @@ query run faster depends on the reason it is slow and beyond the scope of this d A query plan is an upside down tree, and we always read from bottom up. The physical plan in Figure 1 in tree format will look like -``` +```text ▲ │ │ @@ -174,7 +174,7 @@ above but with `EXPLAIN ANALYZE` (note the output is edited for clarity) [`executionplan::metrics`]: https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html#method.metrics -``` +```sql > EXPLAIN ANALYZE SELECT "WatchID" AS wid, "hits.parquet"."ClientIP" AS ip FROM 'hits.parquet' WHERE starts_with("URL", 'http://domcheloveplanet.ru/') @@ -267,7 +267,7 @@ LIMIT 10; We can again see the query plan by using `EXPLAIN`: -``` +```sql > EXPLAIN SELECT "UserID", COUNT(*) FROM 'hits.parquet' GROUP BY "UserID" ORDER BY COUNT(*) DESC LIMIT 10; +---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | plan_type | plan | diff --git a/docs/source/user-guide/sql/aggregate_functions.md b/docs/source/user-guide/sql/aggregate_functions.md index 221bb0572eb8..7d88d3168d23 100644 --- a/docs/source/user-guide/sql/aggregate_functions.md +++ b/docs/source/user-guide/sql/aggregate_functions.md @@ -58,7 +58,7 @@ Aggregate functions operate on a set of values to compute a single result. Returns an array created from the expression elements. If ordering is required, elements are inserted in the specified order. -``` +```sql array_agg(expression [ORDER BY expression]) ``` @@ -81,7 +81,7 @@ array_agg(expression [ORDER BY expression]) Returns the average of numeric values in the specified column. -``` +```sql avg(expression) ``` @@ -108,7 +108,7 @@ avg(expression) Computes the bitwise AND of all non-null input values. -``` +```sql bit_and(expression) ``` @@ -120,7 +120,7 @@ bit_and(expression) Computes the bitwise OR of all non-null input values. -``` +```sql bit_or(expression) ``` @@ -132,7 +132,7 @@ bit_or(expression) Computes the bitwise exclusive OR of all non-null input values. -``` +```sql bit_xor(expression) ``` @@ -144,7 +144,7 @@ bit_xor(expression) Returns true if all non-null input values are true, otherwise false. -``` +```sql bool_and(expression) ``` @@ -167,7 +167,7 @@ bool_and(expression) Returns true if all non-null input values are true, otherwise false. -``` +```sql bool_and(expression) ``` @@ -190,7 +190,7 @@ bool_and(expression) Returns the number of non-null values in the specified column. To include null values in the total count, use `count(*)`. -``` +```sql count(expression) ``` @@ -220,7 +220,7 @@ count(expression) Returns the first element in an aggregation group according to the requested ordering. If no ordering is given, returns an arbitrary element from the group. -``` +```sql first_value(expression [ORDER BY expression]) ``` @@ -243,7 +243,7 @@ first_value(expression [ORDER BY expression]) Returns 1 if the data is aggregated across the specified column, or 0 if it is not aggregated in the result set. -``` +```sql grouping(expression) ``` @@ -270,7 +270,7 @@ grouping(expression) Returns the last element in an aggregation group according to the requested ordering. If no ordering is given, returns an arbitrary element from the group. -``` +```sql last_value(expression [ORDER BY expression]) ``` @@ -293,7 +293,7 @@ last_value(expression [ORDER BY expression]) Returns the maximum value in the specified column. -``` +```sql max(expression) ``` @@ -320,7 +320,7 @@ _Alias of [avg](#avg)._ Returns the median value in the specified column. -``` +```sql median(expression) ``` @@ -343,7 +343,7 @@ median(expression) Returns the minimum value in the specified column. -``` +```sql min(expression) ``` @@ -366,7 +366,7 @@ min(expression) Concatenates the values of string expressions and places separator values between them. -``` +```sql string_agg(expression, delimiter) ``` @@ -391,7 +391,7 @@ string_agg(expression, delimiter) Returns the sum of all values in the specified column. -``` +```sql sum(expression) ``` @@ -414,7 +414,7 @@ sum(expression) Returns the statistical sample variance of a set of numbers. -``` +```sql var(expression) ``` @@ -431,7 +431,7 @@ var(expression) Returns the statistical population variance of a set of numbers. -``` +```sql var_pop(expression) ``` @@ -479,7 +479,7 @@ _Alias of [var](#var)._ Returns the coefficient of correlation between two numeric values. -``` +```sql corr(expression1, expression2) ``` @@ -507,7 +507,7 @@ _Alias of [covar_samp](#covar_samp)._ Returns the sample covariance of a set of number pairs. -``` +```sql covar_samp(expression1, expression2) ``` @@ -531,7 +531,7 @@ covar_samp(expression1, expression2) Returns the sample covariance of a set of number pairs. -``` +```sql covar_samp(expression1, expression2) ``` @@ -559,7 +559,7 @@ covar_samp(expression1, expression2) Returns the nth value in a group of values. -``` +```sql nth_value(expression, n ORDER BY expression) ``` @@ -588,7 +588,7 @@ nth_value(expression, n ORDER BY expression) Computes the average of the independent variable (input) expression_x for the non-null paired data points. -``` +```sql regr_avgx(expression_y, expression_x) ``` @@ -601,7 +601,7 @@ regr_avgx(expression_y, expression_x) Computes the average of the dependent variable (output) expression_y for the non-null paired data points. -``` +```sql regr_avgy(expression_y, expression_x) ``` @@ -614,7 +614,7 @@ regr_avgy(expression_y, expression_x) Counts the number of non-null paired data points. -``` +```sql regr_count(expression_y, expression_x) ``` @@ -627,7 +627,7 @@ regr_count(expression_y, expression_x) Computes the y-intercept of the linear regression line. For the equation (y = kx + b), this function returns b. -``` +```sql regr_intercept(expression_y, expression_x) ``` @@ -640,7 +640,7 @@ regr_intercept(expression_y, expression_x) Computes the square of the correlation coefficient between the independent and dependent variables. -``` +```sql regr_r2(expression_y, expression_x) ``` @@ -653,7 +653,7 @@ regr_r2(expression_y, expression_x) Returns the slope of the linear regression line for non-null pairs in aggregate columns. Given input column Y and X: regr_slope(Y, X) returns the slope (k in Y = k\*X + b) using minimal RSS fitting. -``` +```sql regr_slope(expression_y, expression_x) ``` @@ -666,7 +666,7 @@ regr_slope(expression_y, expression_x) Computes the sum of squares of the independent variable. -``` +```sql regr_sxx(expression_y, expression_x) ``` @@ -679,7 +679,7 @@ regr_sxx(expression_y, expression_x) Computes the sum of products of paired data points. -``` +```sql regr_sxy(expression_y, expression_x) ``` @@ -692,7 +692,7 @@ regr_sxy(expression_y, expression_x) Computes the sum of squares of the dependent variable. -``` +```sql regr_syy(expression_y, expression_x) ``` @@ -705,7 +705,7 @@ regr_syy(expression_y, expression_x) Returns the standard deviation of a set of numbers. -``` +```sql stddev(expression) ``` @@ -732,7 +732,7 @@ stddev(expression) Returns the population standard deviation of a set of numbers. -``` +```sql stddev_pop(expression) ``` @@ -766,7 +766,7 @@ _Alias of [stddev](#stddev)._ Returns the approximate number of distinct input values calculated using the HyperLogLog algorithm. -``` +```sql approx_distinct(expression) ``` @@ -789,7 +789,7 @@ approx_distinct(expression) Returns the approximate median (50th percentile) of input values. It is an alias of `approx_percentile_cont(x, 0.5)`. -``` +```sql approx_median(expression) ``` @@ -812,7 +812,7 @@ approx_median(expression) Returns the approximate percentile of input values using the t-digest algorithm. -``` +```sql approx_percentile_cont(expression, percentile, centroids) ``` @@ -837,7 +837,7 @@ approx_percentile_cont(expression, percentile, centroids) Returns the weighted approximate percentile of input values using the t-digest algorithm. -``` +```sql approx_percentile_cont_with_weight(expression, weight, percentile) ``` diff --git a/docs/source/user-guide/sql/ddl.md b/docs/source/user-guide/sql/ddl.md index e16b9681eb80..71475cff9a39 100644 --- a/docs/source/user-guide/sql/ddl.md +++ b/docs/source/user-guide/sql/ddl.md @@ -55,7 +55,7 @@ file system or remote object store as a named table which can be queried. The supported syntax is: -``` +```sql CREATE [UNBOUNDED] EXTERNAL TABLE [ IF NOT EXISTS ] [ () ] @@ -185,7 +185,7 @@ OPTIONS ('has_header' 'true'); Where `WITH ORDER` clause specifies the sort order: -``` +```sql WITH ORDER (sort_expression1 [ASC | DESC] [NULLS { FIRST | LAST }] [, sort_expression2 [ASC | DESC] [NULLS { FIRST | LAST }] ...]) ``` @@ -198,7 +198,7 @@ WITH ORDER (sort_expression1 [ASC | DESC] [NULLS { FIRST | LAST }] If data sources are already partitioned in Hive style, `PARTITIONED BY` can be used for partition pruning. -``` +```text /mnt/nyctaxi/year=2022/month=01/tripdata.parquet /mnt/nyctaxi/year=2021/month=12/tripdata.parquet /mnt/nyctaxi/year=2021/month=11/tripdata.parquet diff --git a/docs/source/user-guide/sql/dml.md b/docs/source/user-guide/sql/dml.md index dd016cabbfb7..4eda59d6dea1 100644 --- a/docs/source/user-guide/sql/dml.md +++ b/docs/source/user-guide/sql/dml.md @@ -28,7 +28,7 @@ Copies the contents of a table or query to file(s). Supported file formats are `parquet`, `csv`, `json`, and `arrow`.
-COPY { table_name | query } 
+COPY { table_name | query }
 TO 'file_name'
 [ STORED AS format ]
 [ PARTITIONED BY column_name [, ...] ]
@@ -91,7 +91,7 @@ of hive-style partitioned parquet files:
 If the the data contains values of `x` and `y` in column1 and only `a` in
 column2, output files will appear in the following directory structure:
 
-```
+```text
 dir_name/
   column1=x/
     column2=a/
diff --git a/docs/source/user-guide/sql/explain.md b/docs/source/user-guide/sql/explain.md
index 709e6311c28e..3f2c7de43eac 100644
--- a/docs/source/user-guide/sql/explain.md
+++ b/docs/source/user-guide/sql/explain.md
@@ -32,8 +32,9 @@ EXPLAIN [ANALYZE] [VERBOSE] statement
 Shows the execution plan of a statement.
 If you need more detailed output, use `EXPLAIN VERBOSE`.
 
-```
+```sql
 EXPLAIN SELECT SUM(x) FROM table GROUP BY b;
+
 +---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------+
 | plan_type     | plan                                                                                                                                                           |
 +---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------+
@@ -56,8 +57,9 @@ EXPLAIN SELECT SUM(x) FROM table GROUP BY b;
 Shows the execution plan and metrics of a statement.
 If you need more information output, use `EXPLAIN ANALYZE VERBOSE`.
 
-```
+```sql
 EXPLAIN ANALYZE SELECT SUM(x) FROM table GROUP BY b;
+
 +-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
 | plan_type         | plan                                                                                                                                                      |
 +-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
diff --git a/docs/source/user-guide/sql/scalar_functions.md b/docs/source/user-guide/sql/scalar_functions.md
index b769b8b7bdb0..b14bf5b2cc91 100644
--- a/docs/source/user-guide/sql/scalar_functions.md
+++ b/docs/source/user-guide/sql/scalar_functions.md
@@ -73,7 +73,7 @@ dev/update_function_docs.sh file for updating surrounding text.
 
 Returns the absolute value of a number.
 
-```
+```sql
 abs(numeric_expression)
 ```
 
@@ -85,7 +85,7 @@ abs(numeric_expression)
 
 Returns the arc cosine or inverse cosine of a number.
 
-```
+```sql
 acos(numeric_expression)
 ```
 
@@ -97,7 +97,7 @@ acos(numeric_expression)
 
 Returns the area hyperbolic cosine or inverse hyperbolic cosine of a number.
 
-```
+```sql
 acosh(numeric_expression)
 ```
 
@@ -109,7 +109,7 @@ acosh(numeric_expression)
 
 Returns the arc sine or inverse sine of a number.
 
-```
+```sql
 asin(numeric_expression)
 ```
 
@@ -121,7 +121,7 @@ asin(numeric_expression)
 
 Returns the area hyperbolic sine or inverse hyperbolic sine of a number.
 
-```
+```sql
 asinh(numeric_expression)
 ```
 
@@ -133,7 +133,7 @@ asinh(numeric_expression)
 
 Returns the arc tangent or inverse tangent of a number.
 
-```
+```sql
 atan(numeric_expression)
 ```
 
@@ -145,7 +145,7 @@ atan(numeric_expression)
 
 Returns the arc tangent or inverse tangent of `expression_y / expression_x`.
 
-```
+```sql
 atan2(expression_y, expression_x)
 ```
 
@@ -160,7 +160,7 @@ atan2(expression_y, expression_x)
 
 Returns the area hyperbolic tangent or inverse hyperbolic tangent of a number.
 
-```
+```sql
 atanh(numeric_expression)
 ```
 
@@ -172,7 +172,7 @@ atanh(numeric_expression)
 
 Returns the cube root of a number.
 
-```
+```sql
 cbrt(numeric_expression)
 ```
 
@@ -184,7 +184,7 @@ cbrt(numeric_expression)
 
 Returns the nearest integer greater than or equal to a number.
 
-```
+```sql
 ceil(numeric_expression)
 ```
 
@@ -196,7 +196,7 @@ ceil(numeric_expression)
 
 Returns the cosine of a number.
 
-```
+```sql
 cos(numeric_expression)
 ```
 
@@ -208,7 +208,7 @@ cos(numeric_expression)
 
 Returns the hyperbolic cosine of a number.
 
-```
+```sql
 cosh(numeric_expression)
 ```
 
@@ -220,7 +220,7 @@ cosh(numeric_expression)
 
 Returns the cotangent of a number.
 
-```
+```sql
 cot(numeric_expression)
 ```
 
@@ -232,7 +232,7 @@ cot(numeric_expression)
 
 Converts radians to degrees.
 
-```
+```sql
 degrees(numeric_expression)
 ```
 
@@ -244,7 +244,7 @@ degrees(numeric_expression)
 
 Returns the base-e exponential of a number.
 
-```
+```sql
 exp(numeric_expression)
 ```
 
@@ -256,7 +256,7 @@ exp(numeric_expression)
 
 Factorial. Returns 1 if value is less than 2.
 
-```
+```sql
 factorial(numeric_expression)
 ```
 
@@ -268,7 +268,7 @@ factorial(numeric_expression)
 
 Returns the nearest integer less than or equal to a number.
 
-```
+```sql
 floor(numeric_expression)
 ```
 
@@ -280,7 +280,7 @@ floor(numeric_expression)
 
 Returns the greatest common divisor of `expression_x` and `expression_y`. Returns 0 if both inputs are zero.
 
-```
+```sql
 gcd(expression_x, expression_y)
 ```
 
@@ -293,7 +293,7 @@ gcd(expression_x, expression_y)
 
 Returns true if a given number is +NaN or -NaN otherwise returns false.
 
-```
+```sql
 isnan(numeric_expression)
 ```
 
@@ -305,7 +305,7 @@ isnan(numeric_expression)
 
 Returns true if a given number is +0.0 or -0.0 otherwise returns false.
 
-```
+```sql
 iszero(numeric_expression)
 ```
 
@@ -317,7 +317,7 @@ iszero(numeric_expression)
 
 Returns the least common multiple of `expression_x` and `expression_y`. Returns 0 if either input is zero.
 
-```
+```sql
 lcm(expression_x, expression_y)
 ```
 
@@ -330,7 +330,7 @@ lcm(expression_x, expression_y)
 
 Returns the natural logarithm of a number.
 
-```
+```sql
 ln(numeric_expression)
 ```
 
@@ -342,7 +342,7 @@ ln(numeric_expression)
 
 Returns the base-x logarithm of a number. Can either provide a specified base, or if omitted then takes the base-10 of a number.
 
-```
+```sql
 log(base, numeric_expression)
 log(numeric_expression)
 ```
@@ -356,7 +356,7 @@ log(numeric_expression)
 
 Returns the base-10 logarithm of a number.
 
-```
+```sql
 log10(numeric_expression)
 ```
 
@@ -368,7 +368,7 @@ log10(numeric_expression)
 
 Returns the base-2 logarithm of a number.
 
-```
+```sql
 log2(numeric_expression)
 ```
 
@@ -381,7 +381,7 @@ log2(numeric_expression)
 Returns the first argument if it's not _NaN_.
 Returns the second argument otherwise.
 
-```
+```sql
 nanvl(expression_x, expression_y)
 ```
 
@@ -394,7 +394,7 @@ nanvl(expression_x, expression_y)
 
 Returns an approximate value of π.
 
-```
+```sql
 pi()
 ```
 
@@ -406,7 +406,7 @@ _Alias of [power](#power)._
 
 Returns a base expression raised to the power of an exponent.
 
-```
+```sql
 power(base, exponent)
 ```
 
@@ -423,7 +423,7 @@ power(base, exponent)
 
 Converts degrees to radians.
 
-```
+```sql
 radians(numeric_expression)
 ```
 
@@ -436,7 +436,7 @@ radians(numeric_expression)
 Returns a random float value in the range [0, 1).
 The random seed is unique to each row.
 
-```
+```sql
 random()
 ```
 
@@ -444,7 +444,7 @@ random()
 
 Rounds a number to the nearest integer.
 
-```
+```sql
 round(numeric_expression[, decimal_places])
 ```
 
@@ -459,7 +459,7 @@ Returns the sign of a number.
 Negative numbers return `-1`.
 Zero and positive numbers return `1`.
 
-```
+```sql
 signum(numeric_expression)
 ```
 
@@ -471,7 +471,7 @@ signum(numeric_expression)
 
 Returns the sine of a number.
 
-```
+```sql
 sin(numeric_expression)
 ```
 
@@ -483,7 +483,7 @@ sin(numeric_expression)
 
 Returns the hyperbolic sine of a number.
 
-```
+```sql
 sinh(numeric_expression)
 ```
 
@@ -495,7 +495,7 @@ sinh(numeric_expression)
 
 Returns the square root of a number.
 
-```
+```sql
 sqrt(numeric_expression)
 ```
 
@@ -507,7 +507,7 @@ sqrt(numeric_expression)
 
 Returns the tangent of a number.
 
-```
+```sql
 tan(numeric_expression)
 ```
 
@@ -519,7 +519,7 @@ tan(numeric_expression)
 
 Returns the hyperbolic tangent of a number.
 
-```
+```sql
 tanh(numeric_expression)
 ```
 
@@ -531,7 +531,7 @@ tanh(numeric_expression)
 
 Truncates a number to a whole number or truncated to the specified decimal places.
 
-```
+```sql
 trunc(numeric_expression[, decimal_places])
 ```
 
@@ -558,7 +558,7 @@ trunc(numeric_expression[, decimal_places])
 
 Returns the first of its arguments that is not _null_. Returns _null_ if all arguments are _null_. This function is often used to substitute a default value for _null_ values.
 
-```
+```sql
 coalesce(expression1[, ..., expression_n])
 ```
 
@@ -581,7 +581,7 @@ coalesce(expression1[, ..., expression_n])
 
 Returns the greatest value in a list of expressions. Returns _null_ if all expressions are _null_.
 
-```
+```sql
 greatest(expression1[, ..., expression_n])
 ```
 
@@ -608,7 +608,7 @@ _Alias of [nvl](#nvl)._
 
 Returns the smallest value in a list of expressions. Returns _null_ if all expressions are _null_.
 
-```
+```sql
 least(expression1[, ..., expression_n])
 ```
 
@@ -632,7 +632,7 @@ least(expression1[, ..., expression_n])
 Returns _null_ if _expression1_ equals _expression2_; otherwise it returns _expression1_.
 This can be used to perform the inverse operation of [`coalesce`](#coalesce).
 
-```
+```sql
 nullif(expression1, expression2)
 ```
 
@@ -662,7 +662,7 @@ nullif(expression1, expression2)
 
 Returns _expression2_ if _expression1_ is NULL otherwise it returns _expression1_.
 
-```
+```sql
 nvl(expression1, expression2)
 ```
 
@@ -696,7 +696,7 @@ nvl(expression1, expression2)
 
 Returns _expression2_ if _expression1_ is not NULL; otherwise it returns _expression3_.
 
-```
+```sql
 nvl2(expression1, expression2, expression3)
 ```
 
@@ -769,7 +769,7 @@ nvl2(expression1, expression2, expression3)
 
 Returns the Unicode character code of the first character in a string.
 
-```
+```sql
 ascii(str)
 ```
 
@@ -802,7 +802,7 @@ ascii(str)
 
 Returns the bit length of a string.
 
-```
+```sql
 bit_length(str)
 ```
 
@@ -830,7 +830,7 @@ bit_length(str)
 
 Trims the specified trim string from the start and end of a string. If no trim string is provided, all whitespace is removed from the start and end of the input string.
 
-```
+```sql
 btrim(str[, trim_str])
 ```
 
@@ -877,7 +877,7 @@ _Alias of [character_length](#character_length)._
 
 Returns the number of characters in a string.
 
-```
+```sql
 character_length(str)
 ```
 
@@ -910,7 +910,7 @@ character_length(str)
 
 Returns the character with the specified ASCII or Unicode code value.
 
-```
+```sql
 chr(expression)
 ```
 
@@ -937,7 +937,7 @@ chr(expression)
 
 Concatenates multiple strings together.
 
-```
+```sql
 concat(str[, ..., str_n])
 ```
 
@@ -965,7 +965,7 @@ concat(str[, ..., str_n])
 
 Concatenates multiple strings together with a specified separator.
 
-```
+```sql
 concat_ws(separator, str[, ..., str_n])
 ```
 
@@ -994,7 +994,7 @@ concat_ws(separator, str[, ..., str_n])
 
 Return true if search_str is found within string (case-sensitive).
 
-```
+```sql
 contains(str, search_str)
 ```
 
@@ -1018,7 +1018,7 @@ contains(str, search_str)
 
 Tests if a string ends with a substring.
 
-```
+```sql
 ends_with(str, substr)
 ```
 
@@ -1048,7 +1048,7 @@ ends_with(str, substr)
 
 Returns a value in the range of 1 to N if the string str is in the string list strlist consisting of N substrings.
 
-```
+```sql
 find_in_set(str, strlist)
 ```
 
@@ -1072,7 +1072,7 @@ find_in_set(str, strlist)
 
 Capitalizes the first character in each word in the input string. Words are delimited by non-alphanumeric characters.
 
-```
+```sql
 initcap(str)
 ```
 
@@ -1104,7 +1104,7 @@ _Alias of [strpos](#strpos)._
 
 Returns a specified number of characters from the left side of a string.
 
-```
+```sql
 left(str, n)
 ```
 
@@ -1136,7 +1136,7 @@ _Alias of [character_length](#character_length)._
 
 Returns the [`Levenshtein distance`](https://en.wikipedia.org/wiki/Levenshtein_distance) between the two given strings.
 
-```
+```sql
 levenshtein(str1, str2)
 ```
 
@@ -1160,7 +1160,7 @@ levenshtein(str1, str2)
 
 Converts a string to lower-case.
 
-```
+```sql
 lower(str)
 ```
 
@@ -1188,7 +1188,7 @@ lower(str)
 
 Pads the left side of a string with another string to a specified string length.
 
-```
+```sql
 lpad(str, n[, padding_str])
 ```
 
@@ -1217,7 +1217,7 @@ lpad(str, n[, padding_str])
 
 Trims the specified trim string from the beginning of a string. If no trim string is provided, all whitespace is removed from the start of the input string.
 
-```
+```sql
 ltrim(str[, trim_str])
 ```
 
@@ -1258,7 +1258,7 @@ trim(LEADING trim_str FROM str)
 
 Returns the length of a string in bytes.
 
-```
+```sql
 octet_length(str)
 ```
 
@@ -1290,7 +1290,7 @@ _Alias of [strpos](#strpos)._
 
 Returns a string with an input string repeated a specified number.
 
-```
+```sql
 repeat(str, n)
 ```
 
@@ -1314,7 +1314,7 @@ repeat(str, n)
 
 Replaces all occurrences of a specified substring in a string with a new substring.
 
-```
+```sql
 replace(str, substr, replacement)
 ```
 
@@ -1339,7 +1339,7 @@ replace(str, substr, replacement)
 
 Reverses the character order of a string.
 
-```
+```sql
 reverse(str)
 ```
 
@@ -1362,7 +1362,7 @@ reverse(str)
 
 Returns a specified number of characters from the right side of a string.
 
-```
+```sql
 right(str, n)
 ```
 
@@ -1390,7 +1390,7 @@ right(str, n)
 
 Pads the right side of a string with another string to a specified string length.
 
-```
+```sql
 rpad(str, n[, padding_str])
 ```
 
@@ -1419,7 +1419,7 @@ rpad(str, n[, padding_str])
 
 Trims the specified trim string from the end of a string. If no trim string is provided, all whitespace is removed from the end of the input string.
 
-```
+```sql
 rtrim(str[, trim_str])
 ```
 
@@ -1460,7 +1460,7 @@ trim(TRAILING trim_str FROM str)
 
 Splits a string based on a specified delimiter and returns the substring in the specified position.
 
-```
+```sql
 split_part(str, delimiter, pos)
 ```
 
@@ -1485,7 +1485,7 @@ split_part(str, delimiter, pos)
 
 Tests if a string starts with a substring.
 
-```
+```sql
 starts_with(str, substr)
 ```
 
@@ -1509,7 +1509,7 @@ starts_with(str, substr)
 
 Returns the starting position of a specified substring in a string. Positions begin at 1. If the substring does not exist in the string, the function returns 0.
 
-```
+```sql
 strpos(str, substr)
 ```
 
@@ -1544,7 +1544,7 @@ position(substr in origstr)
 
 Extracts a substring of a specified number of characters from a specific starting position in a string.
 
-```
+```sql
 substr(str, start_pos[, length])
 ```
 
@@ -1581,7 +1581,7 @@ Returns the substring from str before count occurrences of the delimiter delim.
 If count is positive, everything to the left of the final delimiter (counting from the left) is returned.
 If count is negative, everything to the right of the final delimiter (counting from the right) is returned.
 
-```
+```sql
 substr_index(str, delim, count)
 ```
 
@@ -1624,7 +1624,7 @@ _Alias of [substr_index](#substr_index)._
 
 Converts an integer to a hexadecimal string.
 
-```
+```sql
 to_hex(int)
 ```
 
@@ -1647,7 +1647,7 @@ to_hex(int)
 
 Translates characters in a string to specified translation characters.
 
-```
+```sql
 translate(str, chars, translation)
 ```
 
@@ -1676,7 +1676,7 @@ _Alias of [btrim](#btrim)._
 
 Converts a string to upper-case.
 
-```
+```sql
 upper(str)
 ```
 
@@ -1704,7 +1704,7 @@ upper(str)
 
 Returns [`UUID v4`]() string value which is unique per row.
 
-```
+```sql
 uuid()
 ```
 
@@ -1728,7 +1728,7 @@ uuid()
 
 Decode binary data from textual representation in string.
 
-```
+```sql
 decode(expression, format)
 ```
 
@@ -1745,7 +1745,7 @@ decode(expression, format)
 
 Encode binary data into a textual representation.
 
-```
+```sql
 encode(expression, format)
 ```
 
@@ -1774,7 +1774,7 @@ The following regular expression functions are supported:
 
 Returns the number of matches that a [regular expression](https://docs.rs/regex/latest/regex/#syntax) has in a string.
 
-```
+```sql
 regexp_count(str, regexp[, start, flags])
 ```
 
@@ -1805,7 +1805,7 @@ regexp_count(str, regexp[, start, flags])
 
 Returns true if a [regular expression](https://docs.rs/regex/latest/regex/#syntax) has at least one match in a string, false otherwise.
 
-```
+```sql
 regexp_like(str, regexp[, flags])
 ```
 
@@ -1843,7 +1843,7 @@ Additional examples can be found [here](https://github.com/apache/datafusion/blo
 
 Returns the first [regular expression](https://docs.rs/regex/latest/regex/#syntax) matches in a string.
 
-```
+```sql
 regexp_match(str, regexp[, flags])
 ```
 
@@ -1882,7 +1882,7 @@ Additional examples can be found [here](https://github.com/apache/datafusion/blo
 
 Replaces substrings in a string that match a [regular expression](https://docs.rs/regex/latest/regex/#syntax).
 
-```
+```sql
 regexp_replace(str, regexp, replacement[, flags])
 ```
 
@@ -1950,7 +1950,7 @@ Returns the current UTC date.
 
 The `current_date()` return value is determined at query time and will return the same date, no matter when in the query plan the function executes.
 
-```
+```sql
 current_date()
 ```
 
@@ -1964,7 +1964,7 @@ Returns the current UTC time.
 
 The `current_time()` return value is determined at query time and will return the same time, no matter when in the query plan the function executes.
 
-```
+```sql
 current_time()
 ```
 
@@ -1978,7 +1978,7 @@ Calculates time intervals and returns the start of the interval nearest to the s
 
 For example, if you "bin" or "window" data into 15 minute intervals, an input timestamp of `2023-01-01T18:18:18Z` will be updated to the start time of the 15 minute bin it is in: `2023-01-01T18:15:00Z`.
 
-```
+```sql
 date_bin(interval, expression, origin-timestamp)
 ```
 
@@ -2034,7 +2034,7 @@ _Alias of [to_char](#to_char)._
 
 Returns the specified part of the date as an integer.
 
-```
+```sql
 date_part(part, expression)
 ```
 
@@ -2073,7 +2073,7 @@ extract(field FROM source)
 
 Truncates a timestamp value to a specified precision.
 
-```
+```sql
 date_trunc(precision, expression)
 ```
 
@@ -2108,7 +2108,7 @@ _Alias of [date_trunc](#date_trunc)._
 
 Converts an integer to RFC3339 timestamp format (`YYYY-MM-DDT00:00:00.000000000Z`). Integers and unsigned integers are interpreted as nanoseconds since the unix epoch (`1970-01-01T00:00:00Z`) return the corresponding timestamp.
 
-```
+```sql
 from_unixtime(expression[, timezone])
 ```
 
@@ -2132,7 +2132,7 @@ from_unixtime(expression[, timezone])
 
 Make a date from year/month/day component parts.
 
-```
+```sql
 make_date(year, month, day)
 ```
 
@@ -2167,7 +2167,7 @@ Returns the current UTC timestamp.
 
 The `now()` return value is determined at query time and will return the same timestamp, no matter when in the query plan the function executes.
 
-```
+```sql
 now()
 ```
 
@@ -2179,7 +2179,7 @@ now()
 
 Returns a string representation of a date, time, timestamp or duration based on a [Chrono format](https://docs.rs/chrono/latest/chrono/format/strftime/index.html). Unlike the PostgreSQL equivalent of this function numerical formatting is not supported.
 
-```
+```sql
 to_char(expression, format)
 ```
 
@@ -2216,7 +2216,7 @@ Returns the corresponding date.
 
 Note: `to_date` returns Date32, which represents its values as the number of days since unix epoch(`1970-01-01`) stored as signed 32 bit value. The largest supported date value is `9999-12-31`.
 
-```
+```sql
 to_date('2017-05-31', '%Y-%m-%d')
 ```
 
@@ -2250,7 +2250,7 @@ Additional examples can be found [here](https://github.com/apache/datafusion/blo
 
 Converts a timestamp with a timezone to a timestamp without a timezone (with no offset or timezone information). This function handles daylight saving time changes.
 
-```
+```sql
 to_local_time(expression)
 ```
 
@@ -2313,7 +2313,7 @@ Converts a value to a timestamp (`YYYY-MM-DDT00:00:00Z`). Supports strings, inte
 
 Note: `to_timestamp` returns `Timestamp(Nanosecond)`. The supported range for integer input is between `-9223372037` and `9223372036`. Supported range for string input is between `1677-09-21T00:12:44.0` and `2262-04-11T23:47:16.0`. Please use `to_timestamp_seconds` for the input outside of supported bounds.
 
-```
+```sql
 to_timestamp(expression[, ..., format_n])
 ```
 
@@ -2345,7 +2345,7 @@ Additional examples can be found [here](https://github.com/apache/datafusion/blo
 
 Converts a value to a timestamp (`YYYY-MM-DDT00:00:00.000000Z`). Supports strings, integer, and unsigned integer types as input. Strings are parsed as RFC3339 (e.g. '2023-07-20T05:44:00') if no [Chrono format](https://docs.rs/chrono/latest/chrono/format/strftime/index.html)s are provided. Integers and unsigned integers are interpreted as microseconds since the unix epoch (`1970-01-01T00:00:00Z`) Returns the corresponding timestamp.
 
-```
+```sql
 to_timestamp_micros(expression[, ..., format_n])
 ```
 
@@ -2377,7 +2377,7 @@ Additional examples can be found [here](https://github.com/apache/datafusion/blo
 
 Converts a value to a timestamp (`YYYY-MM-DDT00:00:00.000Z`). Supports strings, integer, and unsigned integer types as input. Strings are parsed as RFC3339 (e.g. '2023-07-20T05:44:00') if no [Chrono formats](https://docs.rs/chrono/latest/chrono/format/strftime/index.html) are provided. Integers and unsigned integers are interpreted as milliseconds since the unix epoch (`1970-01-01T00:00:00Z`). Returns the corresponding timestamp.
 
-```
+```sql
 to_timestamp_millis(expression[, ..., format_n])
 ```
 
@@ -2409,7 +2409,7 @@ Additional examples can be found [here](https://github.com/apache/datafusion/blo
 
 Converts a value to a timestamp (`YYYY-MM-DDT00:00:00.000000000Z`). Supports strings, integer, and unsigned integer types as input. Strings are parsed as RFC3339 (e.g. '2023-07-20T05:44:00') if no [Chrono format](https://docs.rs/chrono/latest/chrono/format/strftime/index.html)s are provided. Integers and unsigned integers are interpreted as nanoseconds since the unix epoch (`1970-01-01T00:00:00Z`). Returns the corresponding timestamp.
 
-```
+```sql
 to_timestamp_nanos(expression[, ..., format_n])
 ```
 
@@ -2441,7 +2441,7 @@ Additional examples can be found [here](https://github.com/apache/datafusion/blo
 
 Converts a value to a timestamp (`YYYY-MM-DDT00:00:00.000Z`). Supports strings, integer, and unsigned integer types as input. Strings are parsed as RFC3339 (e.g. '2023-07-20T05:44:00') if no [Chrono format](https://docs.rs/chrono/latest/chrono/format/strftime/index.html)s are provided. Integers and unsigned integers are interpreted as seconds since the unix epoch (`1970-01-01T00:00:00Z`). Returns the corresponding timestamp.
 
-```
+```sql
 to_timestamp_seconds(expression[, ..., format_n])
 ```
 
@@ -2473,7 +2473,7 @@ Additional examples can be found [here](https://github.com/apache/datafusion/blo
 
 Converts a value to seconds since the unix epoch (`1970-01-01T00:00:00Z`). Supports strings, dates, timestamps and double types as input. Strings are parsed as RFC3339 (e.g. '2023-07-20T05:44:00') if no [Chrono formats](https://docs.rs/chrono/latest/chrono/format/strftime/index.html) are provided.
 
-```
+```sql
 to_unixtime(expression[, ..., format_n])
 ```
 
@@ -2600,7 +2600,7 @@ _Alias of [current_date](#current_date)._
 
 Returns the first non-null element in the array.
 
-```
+```sql
 array_any_value(array)
 ```
 
@@ -2627,7 +2627,7 @@ array_any_value(array)
 
 Appends an element to the end of an array.
 
-```
+```sql
 array_append(array, element)
 ```
 
@@ -2661,7 +2661,7 @@ _Alias of [array_concat](#array_concat)._
 
 Concatenates arrays.
 
-```
+```sql
 array_concat(array[, ..., array_n])
 ```
 
@@ -2695,7 +2695,7 @@ _Alias of [array_has](#array_has)._
 
 Returns an array of the array's dimensions.
 
-```
+```sql
 array_dims(array)
 ```
 
@@ -2722,7 +2722,7 @@ array_dims(array)
 
 Returns the Euclidean distance between two input arrays of equal length.
 
-```
+```sql
 array_distance(array1, array2)
 ```
 
@@ -2750,7 +2750,7 @@ array_distance(array1, array2)
 
 Returns distinct values from the array after removing duplicates.
 
-```
+```sql
 array_distinct(array)
 ```
 
@@ -2777,7 +2777,7 @@ array_distinct(array)
 
 Extracts the element with the index n from the array.
 
-```
+```sql
 array_element(array, index)
 ```
 
@@ -2811,7 +2811,7 @@ _Alias of [empty](#empty)._
 
 Returns an array of the elements that appear in the first array but not in the second.
 
-```
+```sql
 array_except(array1, array2)
 ```
 
@@ -2849,7 +2849,7 @@ _Alias of [array_element](#array_element)._
 
 Returns true if the array contains the element.
 
-```
+```sql
 array_has(array, element)
 ```
 
@@ -2879,7 +2879,7 @@ array_has(array, element)
 
 Returns true if all elements of sub-array exist in array.
 
-```
+```sql
 array_has_all(array, sub-array)
 ```
 
@@ -2907,7 +2907,7 @@ array_has_all(array, sub-array)
 
 Returns true if any elements exist in both arrays.
 
-```
+```sql
 array_has_any(array, sub-array)
 ```
 
@@ -2940,7 +2940,7 @@ _Alias of [array_position](#array_position)._
 
 Returns an array of elements in the intersection of array1 and array2.
 
-```
+```sql
 array_intersect(array1, array2)
 ```
 
@@ -2978,7 +2978,7 @@ _Alias of [array_to_string](#array_to_string)._
 
 Returns the length of the array dimension.
 
-```
+```sql
 array_length(array, dimension)
 ```
 
@@ -3006,7 +3006,7 @@ array_length(array, dimension)
 
 Returns the number of dimensions of the array.
 
-```
+```sql
 array_ndims(array, element)
 ```
 
@@ -3034,7 +3034,7 @@ array_ndims(array, element)
 
 Returns the array without the last element.
 
-```
+```sql
 array_pop_back(array)
 ```
 
@@ -3061,7 +3061,7 @@ array_pop_back(array)
 
 Returns the array without the first element.
 
-```
+```sql
 array_pop_front(array)
 ```
 
@@ -3088,7 +3088,7 @@ array_pop_front(array)
 
 Returns the position of the first occurrence of the specified element in the array.
 
-```
+```sql
 array_position(array, element)
 array_position(array, element, index)
 ```
@@ -3126,7 +3126,7 @@ array_position(array, element, index)
 
 Searches for an element in the array, returns all occurrences.
 
-```
+```sql
 array_positions(array, element)
 ```
 
@@ -3154,7 +3154,7 @@ array_positions(array, element)
 
 Prepends an element to the beginning of an array.
 
-```
+```sql
 array_prepend(element, array)
 ```
 
@@ -3192,7 +3192,7 @@ _Alias of [array_prepend](#array_prepend)._
 
 Removes the first element from the array equal to the given value.
 
-```
+```sql
 array_remove(array, element)
 ```
 
@@ -3220,7 +3220,7 @@ array_remove(array, element)
 
 Removes all elements from the array equal to the given value.
 
-```
+```sql
 array_remove_all(array, element)
 ```
 
@@ -3248,7 +3248,7 @@ array_remove_all(array, element)
 
 Removes the first `max` elements from the array equal to the given value.
 
-```
+```sql
 array_remove_n(array, element, max))
 ```
 
@@ -3277,7 +3277,7 @@ array_remove_n(array, element, max))
 
 Returns an array containing element `count` times.
 
-```
+```sql
 array_repeat(element, count)
 ```
 
@@ -3311,7 +3311,7 @@ array_repeat(element, count)
 
 Replaces the first occurrence of the specified element with another specified element.
 
-```
+```sql
 array_replace(array, from, to)
 ```
 
@@ -3340,7 +3340,7 @@ array_replace(array, from, to)
 
 Replaces all occurrences of the specified element with another specified element.
 
-```
+```sql
 array_replace_all(array, from, to)
 ```
 
@@ -3369,7 +3369,7 @@ array_replace_all(array, from, to)
 
 Replaces the first `max` occurrences of the specified element with another specified element.
 
-```
+```sql
 array_replace_n(array, from, to, max)
 ```
 
@@ -3399,7 +3399,7 @@ array_replace_n(array, from, to, max)
 
 Resizes the list to contain size elements. Initializes new elements with value or empty if value is not set.
 
-```
+```sql
 array_resize(array, size, value)
 ```
 
@@ -3428,7 +3428,7 @@ array_resize(array, size, value)
 
 Returns the array with the order of the elements reversed.
 
-```
+```sql
 array_reverse(array)
 ```
 
@@ -3455,7 +3455,7 @@ array_reverse(array)
 
 Returns a slice of the array based on 1-indexed start and end positions.
 
-```
+```sql
 array_slice(array, begin, end)
 ```
 
@@ -3485,7 +3485,7 @@ array_slice(array, begin, end)
 
 Sort array.
 
-```
+```sql
 array_sort(array, desc, nulls_first)
 ```
 
@@ -3514,7 +3514,7 @@ array_sort(array, desc, nulls_first)
 
 Converts each element to its text representation.
 
-```
+```sql
 array_to_string(array, delimiter[, null_string])
 ```
 
@@ -3545,7 +3545,7 @@ array_to_string(array, delimiter[, null_string])
 
 Returns an array of elements that are present in both arrays (all elements from both arrays) with out duplicates.
 
-```
+```sql
 array_union(array1, array2)
 ```
 
@@ -3583,7 +3583,7 @@ _Alias of [array_has_any](#array_has_any)._
 
 Returns the total number of elements in the array.
 
-```
+```sql
 cardinality(array)
 ```
 
@@ -3606,7 +3606,7 @@ cardinality(array)
 
 Returns 1 for an empty array or 0 for a non-empty array.
 
-```
+```sql
 empty(array)
 ```
 
@@ -3639,7 +3639,7 @@ Converts an array of arrays to a flat array.
 
 The flattened array contains all the elements from all source arrays.
 
-```
+```sql
 flatten(array)
 ```
 
@@ -3662,7 +3662,7 @@ flatten(array)
 
 Similar to the range function, but it includes the upper bound.
 
-```
+```sql
 generate_series(start, stop, step)
 ```
 
@@ -3847,7 +3847,7 @@ _Alias of [array_union](#array_union)._
 
 Returns an array using the specified input expressions.
 
-```
+```sql
 make_array(expression1[, ..., expression_n])
 ```
 
@@ -3878,7 +3878,7 @@ _Alias of [make_array](#make_array)._
 
 Returns an Arrow array between start and stop with step. The range start..end contains all values with start <= x < end. It is empty if start >= end. Step cannot be 0.
 
-```
+```sql
 range(start, stop, step)
 ```
 
@@ -3910,7 +3910,7 @@ range(start, stop, step)
 
 Splits a string into an array of substrings based on a delimiter. Any substrings matching the optional `null_str` argument are replaced with NULL.
 
-```
+```sql
 string_to_array(str, delimiter[, null_str])
 ```
 
@@ -3955,7 +3955,7 @@ _Alias of [string_to_array](#string_to_array)._
 
 Returns an Arrow struct using the specified name and input expressions pairs.
 
-```
+```sql
 named_struct(expression1_name, expression1_input[, ..., expression_n_name, expression_n_input])
 ```
 
@@ -3996,7 +3996,7 @@ Returns an Arrow struct using the specified input expressions optionally named.
 Fields in the returned struct use the optional name or the `cN` naming convention.
 For example: `c0`, `c1`, `c2`, etc.
 
-```
+```sql
 struct(expression1[, ..., expression_n])
 ```
 
@@ -4059,7 +4059,7 @@ Returns an Arrow map with the specified key-value pairs.
 
 The `make_map` function creates a map from two lists: one for keys and one for values. Each key must be unique and non-null.
 
-```
+```sql
 map(key, value)
 map(key: value)
 make_map(['key1', 'key2'], ['value1', 'value2'])
@@ -4106,7 +4106,7 @@ SELECT MAKE_MAP(['key1', 'key2'], ['value1', null]);
 
 Returns a list containing the value for the given key or an empty list if the key is not present in the map.
 
-```
+```sql
 map_extract(map, key)
 ```
 
@@ -4139,7 +4139,7 @@ SELECT map_extract(MAP {'x': 10, 'y': NULL, 'z': 30}, 'y');
 
 Returns a list of all keys in the map.
 
-```
+```sql
 map_keys(map)
 ```
 
@@ -4163,7 +4163,7 @@ SELECT map_keys(map([100, 5], [42, 43]));
 
 Returns a list of all values in the map.
 
-```
+```sql
 map_values(map)
 ```
 
@@ -4196,7 +4196,7 @@ SELECT map_values(map([100, 5], [42, 43]));
 
 Computes the binary hash of an expression using the specified algorithm.
 
-```
+```sql
 digest(expression, algorithm)
 ```
 
@@ -4228,7 +4228,7 @@ digest(expression, algorithm)
 
 Computes an MD5 128-bit checksum for a string expression.
 
-```
+```sql
 md5(expression)
 ```
 
@@ -4251,7 +4251,7 @@ md5(expression)
 
 Computes the SHA-224 hash of a binary string.
 
-```
+```sql
 sha224(expression)
 ```
 
@@ -4274,7 +4274,7 @@ sha224(expression)
 
 Computes the SHA-256 hash of a binary string.
 
-```
+```sql
 sha256(expression)
 ```
 
@@ -4297,7 +4297,7 @@ sha256(expression)
 
 Computes the SHA-384 hash of a binary string.
 
-```
+```sql
 sha384(expression)
 ```
 
@@ -4320,7 +4320,7 @@ sha384(expression)
 
 Computes the SHA-512 hash of a binary string.
 
-```
+```sql
 sha512(expression)
 ```
 
@@ -4350,7 +4350,7 @@ sha512(expression)
 
 Casts a value to a specific Arrow data type.
 
-```
+```sql
 arrow_cast(expression, datatype)
 ```
 
@@ -4378,7 +4378,7 @@ arrow_cast(expression, datatype)
 
 Returns the name of the underlying [Arrow data type](https://docs.rs/arrow/latest/arrow/datatypes/enum.DataType.html) of the expression.
 
-```
+```sql
 arrow_typeof(expression)
 ```
 
@@ -4404,7 +4404,7 @@ Note: most users invoke `get_field` indirectly via field access
 syntax such as `my_struct_col['field_name']` which results in a call to
 `get_field(my_struct_col, 'field_name')`.
 
-```
+```sql
 get_field(expression1, expression2)
 ```
 
@@ -4444,7 +4444,7 @@ get_field(expression1, expression2)
 
 Returns the version of DataFusion.
 
-```
+```sql
 version()
 ```
 
diff --git a/docs/source/user-guide/sql/window_functions.md b/docs/source/user-guide/sql/window_functions.md
index a68fdbda6709..1c02804f0dee 100644
--- a/docs/source/user-guide/sql/window_functions.md
+++ b/docs/source/user-guide/sql/window_functions.md
@@ -115,7 +115,7 @@ WINDOW w AS (PARTITION BY depname ORDER BY salary DESC);
 
 The syntax for the OVER-clause is
 
-```
+```sql
 function([expr])
   OVER(
     [PARTITION BY expr[, …]]
@@ -126,7 +126,7 @@ function([expr])
 
 where **frame_clause** is one of:
 
-```
+```sql
   { RANGE | ROWS | GROUPS } frame_start
   { RANGE | ROWS | GROUPS } BETWEEN frame_start AND frame_end
 ```
@@ -162,7 +162,7 @@ All [aggregate functions](aggregate_functions.md) can be used as window function
 
 Relative rank of the current row: (number of rows preceding or peer with current row) / (total rows).
 
-```
+```sql
 cume_dist()
 ```
 
@@ -170,7 +170,7 @@ cume_dist()
 
 Returns the rank of the current row without gaps. This function ranks rows in a dense manner, meaning consecutive ranks are assigned even for identical values.
 
-```
+```sql
 dense_rank()
 ```
 
@@ -178,7 +178,7 @@ dense_rank()
 
 Integer ranging from 1 to the argument value, dividing the partition as equally as possible
 
-```
+```sql
 ntile(expression)
 ```
 
@@ -190,7 +190,7 @@ ntile(expression)
 
 Returns the percentage rank of the current row within its partition. The value ranges from 0 to 1 and is computed as `(rank - 1) / (total_rows - 1)`.
 
-```
+```sql
 percent_rank()
 ```
 
@@ -198,7 +198,7 @@ percent_rank()
 
 Returns the rank of the current row within its partition, allowing gaps between ranks. This function provides a ranking similar to `row_number`, but skips ranks for identical values.
 
-```
+```sql
 rank()
 ```
 
@@ -206,7 +206,7 @@ rank()
 
 Number of the current row within its partition, counting from 1.
 
-```
+```sql
 row_number()
 ```
 
@@ -222,7 +222,7 @@ row_number()
 
 Returns value evaluated at the row that is the first row of the window frame.
 
-```
+```sql
 first_value(expression)
 ```
 
@@ -234,7 +234,7 @@ first_value(expression)
 
 Returns value evaluated at the row that is offset rows before the current row within the partition; if there is no such row, instead return default (which must be of the same type as value).
 
-```
+```sql
 lag(expression, offset, default)
 ```
 
@@ -248,7 +248,7 @@ lag(expression, offset, default)
 
 Returns value evaluated at the row that is the last row of the window frame.
 
-```
+```sql
 last_value(expression)
 ```
 
@@ -260,7 +260,7 @@ last_value(expression)
 
 Returns value evaluated at the row that is offset rows after the current row within the partition; if there is no such row, instead return default (which must be of the same type as value).
 
-```
+```sql
 lead(expression, offset, default)
 ```
 
@@ -274,7 +274,7 @@ lead(expression, offset, default)
 
 Returns value evaluated at the row that is the nth row of the window frame (counting from 1); null if no such row.
 
-```
+```sql
 nth_value(expression, n)
 ```