From 7d08f7ff940302e73f53b641228156f3bbb9d4aa Mon Sep 17 00:00:00 2001 From: Andy Grove <agrove@apache.org> Date: Thu, 23 Jan 2025 10:06:20 -0700 Subject: [PATCH 01/20] bump DataFusion to rev 5592834 --- native/Cargo.lock | 195 +++++++++--------- native/Cargo.toml | 30 +-- native/core/src/execution/operators/filter.rs | 17 +- native/core/src/execution/operators/scan.rs | 6 +- native/core/src/execution/planner.rs | 5 +- native/core/src/execution/shuffle/row.rs | 16 +- 6 files changed, 131 insertions(+), 138 deletions(-) diff --git a/native/Cargo.lock b/native/Cargo.lock index 69f6fe97a..5215970dc 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -123,9 +123,9 @@ checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" [[package]] name = "arrow" -version = "53.3.0" +version = "54.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c91839b07e474b3995035fd8ac33ee54f9c9ccbbb1ea33d9909c71bffdf1259d" +checksum = "d2ccdcc8fb14508ca20aaec7076032e5c0b0751b906036d4496786e2f227a37a" dependencies = [ "arrow-arith", "arrow-array", @@ -144,24 +144,23 @@ dependencies = [ [[package]] name = "arrow-arith" -version = "53.3.0" +version = "54.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "855c57c4efd26722b044dcd3e348252560e3e0333087fb9f6479dc0bf744054f" +checksum = "a1aad8e27f32e411a0fc0bf5a625a35f0bf9b9f871cf4542abe31f7cef4beea2" dependencies = [ "arrow-array", "arrow-buffer", "arrow-data", "arrow-schema", "chrono", - "half", "num", ] [[package]] name = "arrow-array" -version = "53.3.0" +version = "54.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bd03279cea46569acf9295f6224fbc370c5df184b4d2ecfe97ccb131d5615a7f" +checksum = "bd6ed90c28c6f73a706c55799b8cc3a094e89257238e5b1d65ca7c70bd3ae23f" dependencies = [ "ahash", "arrow-buffer", @@ -176,9 +175,9 @@ dependencies = [ [[package]] name = "arrow-buffer" -version = "53.3.0" +version = "54.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e4a9b9b1d6d7117f6138e13bc4dd5daa7f94e671b70e8c9c4dc37b4f5ecfc16" +checksum = "fe4a40bdc1552ea10fbdeae4e5a945d8572c32f66bce457b96c13d9c46b80447" dependencies = [ "bytes", "half", @@ -187,9 +186,9 @@ dependencies = [ [[package]] name = "arrow-cast" -version = "53.3.0" +version = "54.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc70e39916e60c5b7af7a8e2719e3ae589326039e1e863675a008bee5ffe90fd" +checksum = "430c0a21aa7f81bcf0f97c57216d7127795ea755f494d27bae2bd233be43c2cc" dependencies = [ "arrow-array", "arrow-buffer", @@ -208,28 +207,25 @@ dependencies = [ [[package]] name = "arrow-csv" -version = "53.3.0" +version = "54.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "789b2af43c1049b03a8d088ff6b2257cdcea1756cd76b174b1f2600356771b97" +checksum = "b4444c8f8c57ac00e6a679ede67d1ae8872c170797dc45b46f75702437a77888" dependencies = [ "arrow-array", - "arrow-buffer", "arrow-cast", - "arrow-data", "arrow-schema", "chrono", "csv", "csv-core", "lazy_static", - "lexical-core", "regex", ] [[package]] name = "arrow-data" -version = "53.3.0" +version = "54.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e4e75edf21ffd53744a9b8e3ed11101f610e7ceb1a29860432824f1834a1f623" +checksum = "09af476cfbe9879937e50b1334c73189de6039186e025b1b1ac84b283b87b20e" dependencies = [ "arrow-buffer", "arrow-schema", @@ -239,13 +235,12 @@ dependencies = [ [[package]] name = "arrow-ipc" -version = "53.3.0" +version = "54.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d186a909dece9160bf8312f5124d797884f608ef5435a36d9d608e0b2a9bcbf8" +checksum = "136296e8824333a8a4c4a6e508e4aa65d5678b801246d0408825ae7b2523c628" dependencies = [ "arrow-array", "arrow-buffer", - "arrow-cast", "arrow-data", "arrow-schema", "flatbuffers", @@ -254,9 +249,9 @@ dependencies = [ [[package]] name = "arrow-json" -version = "53.3.0" +version = "54.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b66ff2fedc1222942d0bd2fd391cb14a85baa3857be95c9373179bd616753b85" +checksum = "e222ad0e419ab8276818c5605a5bb1e35ed86fa8c5e550726433cc63b09c3c78" dependencies = [ "arrow-array", "arrow-buffer", @@ -274,26 +269,23 @@ dependencies = [ [[package]] name = "arrow-ord" -version = "53.3.0" +version = "54.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ece7b5bc1180e6d82d1a60e1688c199829e8842e38497563c3ab6ea813e527fd" +checksum = "eddf14c5f03b679ec8ceac4dfac43f63cdc4ed54dab3cc120a4ef46af38481eb" dependencies = [ "arrow-array", "arrow-buffer", "arrow-data", "arrow-schema", "arrow-select", - "half", - "num", ] [[package]] name = "arrow-row" -version = "53.3.0" +version = "54.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "745c114c8f0e8ce211c83389270de6fbe96a9088a7b32c2a041258a443fe83ff" +checksum = "e9acdc58da19f383f4ba381fa0e3583534ae2ceb31269aaf4a03f08ff13e8443" dependencies = [ - "ahash", "arrow-array", "arrow-buffer", "arrow-data", @@ -303,18 +295,18 @@ dependencies = [ [[package]] name = "arrow-schema" -version = "53.3.0" +version = "54.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b95513080e728e4cec37f1ff5af4f12c9688d47795d17cda80b6ec2cf74d4678" +checksum = "3a1822a1a952955637e85e8f9d6b0e04dd75d65492b87ec548dd593d3a1f772b" dependencies = [ "bitflags 2.6.0", ] [[package]] name = "arrow-select" -version = "53.3.0" +version = "54.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e415279094ea70323c032c6e739c48ad8d80e78a09bef7117b8718ad5bf3722" +checksum = "5c4172e9a12dfe15303d3926269f9ead471ea93bdd067d113abc65cb6c48e246" dependencies = [ "ahash", "arrow-array", @@ -326,9 +318,9 @@ dependencies = [ [[package]] name = "arrow-string" -version = "53.3.0" +version = "54.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "11d956cae7002eb8d83a27dbd34daaea1cf5b75852f0b84deb4d93a276e92bbf" +checksum = "73683040445f4932342781926189901c9521bb1a787c35dbe628a3ce51372d3c" dependencies = [ "arrow-array", "arrow-buffer", @@ -828,8 +820,7 @@ dependencies = [ [[package]] name = "datafusion" version = "44.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "014fc8c384ecacedaabb3bc8359c2a6c6e9d8f7bea65be3434eccacfc37f52d9" +source = "git+https://github.com/apache/datafusion?rev=5592834#5592834617f2b8ca99332bdf0be753744af910fd" dependencies = [ "arrow", "arrow-array", @@ -856,7 +847,7 @@ dependencies = [ "datafusion-sql", "futures", "glob", - "itertools 0.13.0", + "itertools 0.14.0", "log", "object_store", "parking_lot", @@ -873,8 +864,7 @@ dependencies = [ [[package]] name = "datafusion-catalog" version = "44.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee60d33e210ef96070377ae667ece7caa0e959c8387496773d4a1a72f1a5012e" +source = "git+https://github.com/apache/datafusion?rev=5592834#5592834617f2b8ca99332bdf0be753744af910fd" dependencies = [ "arrow-schema", "async-trait", @@ -974,14 +964,15 @@ dependencies = [ [[package]] name = "datafusion-common" version = "44.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b42b7d720fe21ed9cca2ebb635f3f13a12cfab786b41e0fba184fb2e620525b" +source = "git+https://github.com/apache/datafusion?rev=5592834#5592834617f2b8ca99332bdf0be753744af910fd" dependencies = [ "ahash", "arrow", "arrow-array", "arrow-buffer", + "arrow-ipc", "arrow-schema", + "base64", "half", "hashbrown 0.14.5", "indexmap", @@ -998,8 +989,7 @@ dependencies = [ [[package]] name = "datafusion-common-runtime" version = "44.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "72fbf14d4079f7ce5306393084fe5057dddfdc2113577e0049310afa12e94281" +source = "git+https://github.com/apache/datafusion?rev=5592834#5592834617f2b8ca99332bdf0be753744af910fd" dependencies = [ "log", "tokio", @@ -1008,14 +998,12 @@ dependencies = [ [[package]] name = "datafusion-doc" version = "44.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c278dbd64860ed0bb5240fc1f4cb6aeea437153910aea69bcf7d5a8d6d0454f3" +source = "git+https://github.com/apache/datafusion?rev=5592834#5592834617f2b8ca99332bdf0be753744af910fd" [[package]] name = "datafusion-execution" version = "44.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e22cb02af47e756468b3cbfee7a83e3d4f2278d452deb4b033ba933c75169486" +source = "git+https://github.com/apache/datafusion?rev=5592834#5592834617f2b8ca99332bdf0be753744af910fd" dependencies = [ "arrow", "dashmap", @@ -1033,8 +1021,7 @@ dependencies = [ [[package]] name = "datafusion-expr" version = "44.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62298eadb1d15b525df1315e61a71519ffc563d41d5c3b2a30fda2d70f77b93c" +source = "git+https://github.com/apache/datafusion?rev=5592834#5592834617f2b8ca99332bdf0be753744af910fd" dependencies = [ "arrow", "chrono", @@ -1053,19 +1040,17 @@ dependencies = [ [[package]] name = "datafusion-expr-common" version = "44.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dda7f73c5fc349251cd3dcb05773c5bf55d2505a698ef9d38dfc712161ea2f55" +source = "git+https://github.com/apache/datafusion?rev=5592834#5592834617f2b8ca99332bdf0be753744af910fd" dependencies = [ "arrow", "datafusion-common", - "itertools 0.13.0", + "itertools 0.14.0", ] [[package]] name = "datafusion-functions" version = "44.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd197f3b2975424d3a4898ea46651be855a46721a56727515dbd5c9e2fb597da" +source = "git+https://github.com/apache/datafusion?rev=5592834#5592834617f2b8ca99332bdf0be753744af910fd" dependencies = [ "arrow", "arrow-buffer", @@ -1081,7 +1066,7 @@ dependencies = [ "datafusion-macros", "hashbrown 0.14.5", "hex", - "itertools 0.13.0", + "itertools 0.14.0", "log", "md-5", "rand", @@ -1094,11 +1079,11 @@ dependencies = [ [[package]] name = "datafusion-functions-aggregate" version = "44.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aabbe48fba18f9981b134124381bee9e46f93518b8ad2f9721ee296cef5affb9" +source = "git+https://github.com/apache/datafusion?rev=5592834#5592834617f2b8ca99332bdf0be753744af910fd" dependencies = [ "ahash", "arrow", + "arrow-buffer", "arrow-schema", "datafusion-common", "datafusion-doc", @@ -1116,8 +1101,7 @@ dependencies = [ [[package]] name = "datafusion-functions-aggregate-common" version = "44.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d7a3fefed9c8c11268d446d924baca8cabf52fe32f73fdaa20854bac6473590c" +source = "git+https://github.com/apache/datafusion?rev=5592834#5592834617f2b8ca99332bdf0be753744af910fd" dependencies = [ "ahash", "arrow", @@ -1129,8 +1113,7 @@ dependencies = [ [[package]] name = "datafusion-functions-nested" version = "44.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6360f27464fab857bec698af39b2ae331dc07c8bf008fb4de387a19cdc6815a5" +source = "git+https://github.com/apache/datafusion?rev=5592834#5592834617f2b8ca99332bdf0be753744af910fd" dependencies = [ "arrow", "arrow-array", @@ -1138,12 +1121,14 @@ dependencies = [ "arrow-ord", "arrow-schema", "datafusion-common", + "datafusion-doc", "datafusion-execution", "datafusion-expr", "datafusion-functions", "datafusion-functions-aggregate", + "datafusion-macros", "datafusion-physical-expr-common", - "itertools 0.13.0", + "itertools 0.14.0", "log", "paste", ] @@ -1151,8 +1136,7 @@ dependencies = [ [[package]] name = "datafusion-functions-table" version = "44.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c35c070eb705c12795dab399c3809f4dfbc290678c624d3989490ca9b8449c1" +source = "git+https://github.com/apache/datafusion?rev=5592834#5592834617f2b8ca99332bdf0be753744af910fd" dependencies = [ "arrow", "async-trait", @@ -1167,8 +1151,7 @@ dependencies = [ [[package]] name = "datafusion-functions-window" version = "44.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52229bca26b590b140900752226c829f15fc1a99840e1ca3ce1a9534690b82a8" +source = "git+https://github.com/apache/datafusion?rev=5592834#5592834617f2b8ca99332bdf0be753744af910fd" dependencies = [ "datafusion-common", "datafusion-doc", @@ -1184,8 +1167,7 @@ dependencies = [ [[package]] name = "datafusion-functions-window-common" version = "44.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "367befc303b64a668a10ae6988a064a9289e1999e71a7f8e526b6e14d6bdd9d6" +source = "git+https://github.com/apache/datafusion?rev=5592834#5592834617f2b8ca99332bdf0be753744af910fd" dependencies = [ "datafusion-common", "datafusion-physical-expr-common", @@ -1194,9 +1176,9 @@ dependencies = [ [[package]] name = "datafusion-macros" version = "44.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f5de3c8f386ea991696553afe241a326ecbc3c98a12c562867e4be754d3a060c" +source = "git+https://github.com/apache/datafusion?rev=5592834#5592834617f2b8ca99332bdf0be753744af910fd" dependencies = [ + "datafusion-expr", "quote", "syn 2.0.93", ] @@ -1204,8 +1186,7 @@ dependencies = [ [[package]] name = "datafusion-optimizer" version = "44.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "53b520413906f755910422b016fb73884ae6e9e1b376de4f9584b6c0e031da75" +source = "git+https://github.com/apache/datafusion?rev=5592834#5592834617f2b8ca99332bdf0be753744af910fd" dependencies = [ "arrow", "chrono", @@ -1213,7 +1194,7 @@ dependencies = [ "datafusion-expr", "datafusion-physical-expr", "indexmap", - "itertools 0.13.0", + "itertools 0.14.0", "log", "regex", "regex-syntax", @@ -1222,8 +1203,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" version = "44.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "acd6ddc378f6ad19af95ccd6790dec8f8e1264bc4c70e99ddc1830c1a1c78ccd" +source = "git+https://github.com/apache/datafusion?rev=5592834#5592834617f2b8ca99332bdf0be753744af910fd" dependencies = [ "ahash", "arrow", @@ -1238,47 +1218,50 @@ dependencies = [ "half", "hashbrown 0.14.5", "indexmap", - "itertools 0.13.0", + "itertools 0.14.0", "log", "paste", - "petgraph", + "petgraph 0.7.1", ] [[package]] name = "datafusion-physical-expr-common" version = "44.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "06e6c05458eccd74b4c77ed6a1fe63d52434240711de7f6960034794dad1caf5" +source = "git+https://github.com/apache/datafusion?rev=5592834#5592834617f2b8ca99332bdf0be753744af910fd" dependencies = [ "ahash", "arrow", "datafusion-common", "datafusion-expr-common", "hashbrown 0.14.5", - "itertools 0.13.0", + "itertools 0.14.0", ] [[package]] name = "datafusion-physical-optimizer" version = "44.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9dc3a82190f49c37d377f31317e07ab5d7588b837adadba8ac367baad5dc2351" +source = "git+https://github.com/apache/datafusion?rev=5592834#5592834617f2b8ca99332bdf0be753744af910fd" dependencies = [ "arrow", + "arrow-schema", "datafusion-common", "datafusion-execution", + "datafusion-expr", "datafusion-expr-common", + "datafusion-functions-aggregate", "datafusion-physical-expr", + "datafusion-physical-expr-common", "datafusion-physical-plan", - "itertools 0.13.0", + "futures", + "itertools 0.14.0", "log", + "url", ] [[package]] name = "datafusion-physical-plan" version = "44.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a6608bc9844b4ddb5ed4e687d173e6c88700b1d0482f43894617d18a1fe75da" +source = "git+https://github.com/apache/datafusion?rev=5592834#5592834617f2b8ca99332bdf0be753744af910fd" dependencies = [ "ahash", "arrow", @@ -1299,7 +1282,7 @@ dependencies = [ "half", "hashbrown 0.14.5", "indexmap", - "itertools 0.13.0", + "itertools 0.14.0", "log", "parking_lot", "pin-project-lite", @@ -1309,8 +1292,7 @@ dependencies = [ [[package]] name = "datafusion-sql" version = "44.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a884061c79b33d0c8e84a6f4f4be8bdc12c0f53f5af28ddf5d6d95ac0b15fdc" +source = "git+https://github.com/apache/datafusion?rev=5592834#5592834617f2b8ca99332bdf0be753744af910fd" dependencies = [ "arrow", "arrow-array", @@ -1438,6 +1420,12 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" +[[package]] +name = "fixedbitset" +version = "0.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99" + [[package]] name = "flatbuffers" version = "24.12.23" @@ -1907,6 +1895,15 @@ dependencies = [ "either", ] +[[package]] +name = "itertools" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b192c782037fadd9cfa75548310488aabdbf3d2da73885b31bd0abd03351285" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.14" @@ -2361,9 +2358,9 @@ dependencies = [ [[package]] name = "parquet" -version = "53.3.0" +version = "54.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b449890367085eb65d7d3321540abc3d7babbd179ce31df0016e90719114191" +checksum = "3334c50239d9f4951653d84fa6f636da86f53742e5e5849a30fbe852f3ff4383" dependencies = [ "ahash", "arrow-array", @@ -2422,7 +2419,17 @@ version = "0.6.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b4c5cc86750666a3ed20bdaf5ca2a0344f9c67674cae0515bec2da16fbaa47db" dependencies = [ - "fixedbitset", + "fixedbitset 0.4.2", + "indexmap", +] + +[[package]] +name = "petgraph" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3672b37090dbd86368a4145bc067582552b29c27377cad4e0a306c97f9bd7772" +dependencies = [ + "fixedbitset 0.5.7", "indexmap", ] @@ -2582,7 +2589,7 @@ dependencies = [ "lazy_static", "log", "multimap", - "petgraph", + "petgraph 0.6.5", "prost 0.9.0", "prost-types", "regex", diff --git a/native/Cargo.toml b/native/Cargo.toml index 624d63ad2..2c10e1d86 100644 --- a/native/Cargo.toml +++ b/native/Cargo.toml @@ -33,21 +33,21 @@ edition = "2021" rust-version = "1.79" [workspace.dependencies] -arrow = { version = "53.3.0", features = ["prettyprint", "ffi", "chrono-tz"] } -arrow-array = { version = "53.3.0" } -arrow-buffer = { version = "53.3.0" } -arrow-data = { version = "53.3.0" } -arrow-schema = { version = "53.3.0" } -parquet = { version = "53.3.0", default-features = false, features = ["experimental"] } -datafusion = { version = "44.0.0", default-features = false, features = ["unicode_expressions", "crypto_expressions"] } -datafusion-common = { version = "44.0.0", default-features = false } -datafusion-functions = { version = "44.0.0", default-features = false, features = ["crypto_expressions"] } -datafusion-functions-nested = { version = "44.0.0", default-features = false } -datafusion-expr = { version = "44.0.0", default-features = false } -datafusion-expr-common = { version = "44.0.0", default-features = false } -datafusion-execution = { version = "44.0.0", default-features = false } -datafusion-physical-plan = { version = "44.0.0", default-features = false } -datafusion-physical-expr = { version = "44.0.0", default-features = false } +arrow = { version = "54.0.0", features = ["prettyprint", "ffi", "chrono-tz"] } +arrow-array = { version = "54.0.0" } +arrow-buffer = { version = "54.0.0" } +arrow-data = { version = "54.0.0" } +arrow-schema = { version = "54.0.0" } +parquet = { version = "54.0.0", default-features = false, features = ["experimental"] } +datafusion = { git = "https://github.com/apache/datafusion", rev = "5592834", default-features = false, features = ["unicode_expressions", "crypto_expressions"] } +datafusion-common = { git = "https://github.com/apache/datafusion", rev = "5592834", default-features = false } +datafusion-functions = { git = "https://github.com/apache/datafusion", rev = "5592834", default-features = false, features = ["crypto_expressions"] } +datafusion-functions-nested = { git = "https://github.com/apache/datafusion", rev = "5592834", default-features = false } +datafusion-expr = { git = "https://github.com/apache/datafusion", rev = "5592834", default-features = false } +datafusion-expr-common = { git = "https://github.com/apache/datafusion", rev = "5592834", default-features = false } +datafusion-execution = { git = "https://github.com/apache/datafusion", rev = "5592834", default-features = false } +datafusion-physical-plan = { git = "https://github.com/apache/datafusion", rev = "5592834", default-features = false } +datafusion-physical-expr = { git = "https://github.com/apache/datafusion", rev = "5592834", default-features = false } datafusion-comet-spark-expr = { path = "spark-expr", version = "0.6.0" } datafusion-comet-proto = { path = "proto", version = "0.6.0" } chrono = { version = "0.4", default-features = false, features = ["clock"] } diff --git a/native/core/src/execution/operators/filter.rs b/native/core/src/execution/operators/filter.rs index 0312f869e..67da8fce9 100644 --- a/native/core/src/execution/operators/filter.rs +++ b/native/core/src/execution/operators/filter.rs @@ -41,7 +41,8 @@ use datafusion_physical_expr::expressions::BinaryExpr; use datafusion_physical_expr::intervals::utils::check_support; use datafusion_physical_expr::utils::collect_columns; use datafusion_physical_expr::{ - analyze, split_conjunction, AnalysisContext, ConstExpr, ExprBoundaries, PhysicalExpr, + analyze, split_conjunction, AcrossPartitions, AnalysisContext, ConstExpr, ExprBoundaries, + PhysicalExpr, }; use futures::stream::{Stream, StreamExt}; @@ -168,11 +169,15 @@ impl FilterExec { if binary.op() == &Operator::Eq { // Filter evaluates to single value for all partitions if input_eqs.is_expr_constant(binary.left()) { - res_constants - .push(ConstExpr::from(binary.right()).with_across_partitions(true)) + res_constants.push( + ConstExpr::from(binary.right()) + .with_across_partitions(AcrossPartitions::Heterogeneous), + ) } else if input_eqs.is_expr_constant(binary.right()) { - res_constants - .push(ConstExpr::from(binary.left()).with_across_partitions(true)) + res_constants.push( + ConstExpr::from(binary.left()) + .with_across_partitions(AcrossPartitions::Heterogeneous), + ) } } } @@ -200,7 +205,7 @@ impl FilterExec { .filter(|column| stats.column_statistics[column.index()].is_singleton()) .map(|column| { let expr = Arc::new(column) as _; - ConstExpr::new(expr).with_across_partitions(true) + ConstExpr::new(expr).with_across_partitions(AcrossPartitions::Heterogeneous) }); // this is for statistics eq_properties = eq_properties.with_constants(constants); diff --git a/native/core/src/execution/operators/scan.rs b/native/core/src/execution/operators/scan.rs index 888cd2fdb..81dfccefb 100644 --- a/native/core/src/execution/operators/scan.rs +++ b/native/core/src/execution/operators/scan.rs @@ -304,11 +304,7 @@ fn scan_schema(input_batch: &InputBatch, data_types: &[DataType]) -> SchemaRef { .map(|(idx, c)| { let datatype = ScanExec::unpack_dictionary_type(c.data_type()); // We don't use the field name. Put a placeholder. - if matches!(datatype, DataType::Dictionary(_, _)) { - Field::new_dict(format!("col_{}", idx), datatype, true, idx as i64, false) - } else { - Field::new(format!("col_{}", idx), datatype, true) - } + Field::new(format!("col_{}", idx), datatype, true) }) .collect::<Vec<Field>>() } diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index c7df503a7..fa0a86926 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -53,7 +53,6 @@ use datafusion::{ }, PhysicalExpr, PhysicalSortExpr, ScalarFunctionExpr, }, - physical_optimizer::join_selection::swap_hash_join, physical_plan::{ aggregates::{AggregateMode as DFAggregateMode, PhysicalGroupBy}, joins::{utils::JoinFilter, HashJoinExec, PartitionMode, SortMergeJoinExec}, @@ -1467,7 +1466,7 @@ impl PhysicalPlanner { )) } else { let swapped_hash_join = - swap_hash_join(hash_join.as_ref(), PartitionMode::Partitioned)?; + hash_join.as_ref().swap_inputs(PartitionMode::Partitioned)?; let mut additional_native_plans = vec![]; if swapped_hash_join.as_any().is::<ProjectionExec>() { @@ -1655,7 +1654,7 @@ impl PhysicalPlanner { Some(JoinFilter::new( rewritten_physical_expr, column_indices, - filter_schema, + filter_schema.into(), )) } else { None diff --git a/native/core/src/execution/shuffle/row.rs b/native/core/src/execution/shuffle/row.rs index 54a9bb31f..f9ecf4790 100644 --- a/native/core/src/execution/shuffle/row.rs +++ b/native/core/src/execution/shuffle/row.rs @@ -3435,24 +3435,10 @@ fn builder_to_array( } fn make_batch(arrays: Vec<ArrayRef>, row_count: usize) -> Result<RecordBatch, ArrowError> { - let mut dict_id = 0; let fields = arrays .iter() .enumerate() - .map(|(i, array)| match array.data_type() { - DataType::Dictionary(_, _) => { - let field = Field::new_dict( - format!("c{}", i), - array.data_type().clone(), - true, - dict_id, - false, - ); - dict_id += 1; - field - } - _ => Field::new(format!("c{}", i), array.data_type().clone(), true), - }) + .map(|(i, array)| Field::new(format!("c{}", i), array.data_type().clone(), true)) .collect::<Vec<_>>(); let schema = Arc::new(Schema::new(fields)); let options = RecordBatchOptions::new().with_row_count(Option::from(row_count)); From 735ff00395e415cd678489bddc2accff57ede6b1 Mon Sep 17 00:00:00 2001 From: Andy Grove <agrove@apache.org> Date: Thu, 23 Jan 2025 11:35:49 -0700 Subject: [PATCH 02/20] update FilterExec --- native/core/src/execution/operators/filter.rs | 162 ++++++++++++++---- 1 file changed, 133 insertions(+), 29 deletions(-) diff --git a/native/core/src/execution/operators/filter.rs b/native/core/src/execution/operators/filter.rs index 67da8fce9..1396bf645 100644 --- a/native/core/src/execution/operators/filter.rs +++ b/native/core/src/execution/operators/filter.rs @@ -32,11 +32,14 @@ use arrow::record_batch::RecordBatch; use arrow_array::{make_array, Array, ArrayRef, BooleanArray, RecordBatchOptions}; use arrow_data::transform::MutableArrayData; use arrow_schema::ArrowError; +use datafusion::physical_plan::common::can_project; +use datafusion::physical_plan::execution_plan::CardinalityEffect; use datafusion_common::cast::as_boolean_array; use datafusion_common::stats::Precision; -use datafusion_common::{internal_err, plan_err, DataFusionError, Result}; +use datafusion_common::{internal_err, plan_err, project_schema, DataFusionError, Result}; use datafusion_execution::TaskContext; use datafusion_expr::Operator; +use datafusion_physical_expr::equivalence::ProjectionMapping; use datafusion_physical_expr::expressions::BinaryExpr; use datafusion_physical_expr::intervals::utils::check_support; use datafusion_physical_expr::utils::collect_columns; @@ -44,14 +47,13 @@ use datafusion_physical_expr::{ analyze, split_conjunction, AcrossPartitions, AnalysisContext, ConstExpr, ExprBoundaries, PhysicalExpr, }; - use futures::stream::{Stream, StreamExt}; use log::trace; /// This is a copy of DataFusion's FilterExec with one modification to ensure that input /// batches are never passed through unchanged. The changes are between the comments /// `BEGIN Comet change` and `END Comet change`. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct FilterExec { /// The expression to filter on. This expression must evaluate to a boolean value. predicate: Arc<dyn PhysicalExpr>, @@ -63,6 +65,8 @@ pub struct FilterExec { default_selectivity: u8, /// Properties equivalence properties, partitioning, etc. cache: PlanProperties, + /// The projection indices of the columns in the output schema of join + projection: Option<Vec<usize>>, } impl FilterExec { @@ -74,13 +78,15 @@ impl FilterExec { match predicate.data_type(input.schema().as_ref())? { DataType::Boolean => { let default_selectivity = 20; - let cache = Self::compute_properties(&input, &predicate, default_selectivity)?; + let cache = + Self::compute_properties(&input, &predicate, default_selectivity, None)?; Ok(Self { predicate, input: Arc::clone(&input), metrics: ExecutionPlanMetricsSet::new(), default_selectivity, cache, + projection: None, }) } other => { @@ -102,6 +108,35 @@ impl FilterExec { Ok(self) } + /// Return new instance of [FilterExec] with the given projection. + pub fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> { + // Check if the projection is valid + can_project(&self.schema(), projection.as_ref())?; + + let projection = match projection { + Some(projection) => match &self.projection { + Some(p) => Some(projection.iter().map(|i| p[*i]).collect()), + None => Some(projection), + }, + None => None, + }; + + let cache = Self::compute_properties( + &self.input, + &self.predicate, + self.default_selectivity, + projection.as_ref(), + )?; + Ok(Self { + predicate: Arc::clone(&self.predicate), + input: Arc::clone(&self.input), + metrics: self.metrics.clone(), + default_selectivity: self.default_selectivity, + cache, + projection, + }) + } + /// The expression to filter on. This expression must evaluate to a boolean value. pub fn predicate(&self) -> &Arc<dyn PhysicalExpr> { &self.predicate @@ -117,6 +152,11 @@ impl FilterExec { self.default_selectivity } + /// Projection + pub fn projection(&self) -> Option<&Vec<usize>> { + self.projection.as_ref() + } + /// Calculates `Statistics` for `FilterExec`, by applying selectivity (either default, or estimated) to input statistics. fn statistics_helper( input: &Arc<dyn ExecutionPlan>, @@ -169,15 +209,21 @@ impl FilterExec { if binary.op() == &Operator::Eq { // Filter evaluates to single value for all partitions if input_eqs.is_expr_constant(binary.left()) { + let (expr, across_parts) = ( + binary.right(), + input_eqs.get_expr_constant_value(binary.right()), + ); res_constants.push( - ConstExpr::from(binary.right()) - .with_across_partitions(AcrossPartitions::Heterogeneous), - ) + ConstExpr::new(Arc::clone(expr)).with_across_partitions(across_parts), + ); } else if input_eqs.is_expr_constant(binary.right()) { + let (expr, across_parts) = ( + binary.left(), + input_eqs.get_expr_constant_value(binary.left()), + ); res_constants.push( - ConstExpr::from(binary.left()) - .with_across_partitions(AcrossPartitions::Heterogeneous), - ) + ConstExpr::new(Arc::clone(expr)).with_across_partitions(across_parts), + ); } } } @@ -189,6 +235,7 @@ impl FilterExec { input: &Arc<dyn ExecutionPlan>, predicate: &Arc<dyn PhysicalExpr>, default_selectivity: u8, + projection: Option<&Vec<usize>>, ) -> Result<PlanProperties> { // Combine the equal predicates with the input equivalence properties // to construct the equivalence properties: @@ -204,17 +251,32 @@ impl FilterExec { .into_iter() .filter(|column| stats.column_statistics[column.index()].is_singleton()) .map(|column| { + let value = stats.column_statistics[column.index()] + .min_value + .get_value(); let expr = Arc::new(column) as _; - ConstExpr::new(expr).with_across_partitions(AcrossPartitions::Heterogeneous) + ConstExpr::new(expr) + .with_across_partitions(AcrossPartitions::Uniform(value.cloned())) }); - // this is for statistics + // This is for statistics eq_properties = eq_properties.with_constants(constants); - // this is for logical constant (for example: a = '1', then a could be marked as a constant) + // This is for logical constant (for example: a = '1', then a could be marked as a constant) // to do: how to deal with multiple situation to represent = (for example c1 between 0 and 0) eq_properties = eq_properties.with_constants(Self::extend_constants(input, predicate)); + + let mut output_partitioning = input.output_partitioning().clone(); + // If contains projection, update the PlanProperties. + if let Some(projection) = projection { + let schema = eq_properties.schema(); + let projection_mapping = ProjectionMapping::from_indices(projection, schema)?; + let out_schema = project_schema(schema, Some(projection))?; + output_partitioning = output_partitioning.project(&projection_mapping, &eq_properties); + eq_properties = eq_properties.project(&projection_mapping, out_schema); + } + Ok(PlanProperties::new( eq_properties, - input.output_partitioning().clone(), // Output Partitioning + output_partitioning, input.pipeline_behavior(), input.boundedness(), )) @@ -225,7 +287,23 @@ impl DisplayAs for FilterExec { fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result { match t { DisplayFormatType::Default | DisplayFormatType::Verbose => { - write!(f, "FilterExec: {}", self.predicate) + let display_projections = if let Some(projection) = self.projection.as_ref() { + format!( + ", projection=[{}]", + projection + .iter() + .map(|index| format!( + "{}@{}", + self.input.schema().fields().get(*index).unwrap().name(), + index + )) + .collect::<Vec<_>>() + .join(", ") + ) + } else { + "".to_string() + }; + write!(f, "FilterExec: {}{}", self.predicate, display_projections) } } } @@ -233,7 +311,7 @@ impl DisplayAs for FilterExec { impl ExecutionPlan for FilterExec { fn name(&self) -> &'static str { - "CometFilterExec" + "FilterExec" } /// Return a reference to Any that can be used for downcasting @@ -250,7 +328,7 @@ impl ExecutionPlan for FilterExec { } fn maintains_input_order(&self) -> Vec<bool> { - // tell optimizer this operator doesn't reorder its input + // Tell optimizer this operator doesn't reorder its input vec![true] } @@ -263,6 +341,7 @@ impl ExecutionPlan for FilterExec { let selectivity = e.default_selectivity(); e.with_default_selectivity(selectivity) }) + .and_then(|e| e.with_projection(self.projection().cloned())) .map(|e| Arc::new(e) as _) } @@ -279,10 +358,11 @@ impl ExecutionPlan for FilterExec { ); let baseline_metrics = BaselineMetrics::new(&self.metrics, partition); Ok(Box::pin(FilterExecStream { - schema: self.input.schema(), + schema: self.schema(), predicate: Arc::clone(&self.predicate), input: self.input.execute(partition, context)?, baseline_metrics, + projection: self.projection.clone(), })) } @@ -293,7 +373,13 @@ impl ExecutionPlan for FilterExec { /// The output statistics of a filtering operation can be estimated if the /// predicate's selectivity value can be determined for the incoming data. fn statistics(&self) -> Result<Statistics> { - Self::statistics_helper(&self.input, self.predicate(), self.default_selectivity) + let stats = + Self::statistics_helper(&self.input, self.predicate(), self.default_selectivity)?; + Ok(stats.project(self.projection.as_ref())) + } + + fn cardinality_effect(&self) -> CardinalityEffect { + CardinalityEffect::LowerEqual } } @@ -337,28 +423,41 @@ fn collect_new_statistics( /// The FilterExec streams wraps the input iterator and applies the predicate expression to /// determine which rows to include in its output batches struct FilterExecStream { - /// Output schema, which is the same as the input schema for this operator + /// Output schema after the projection schema: SchemaRef, /// The expression to filter on. This expression must evaluate to a boolean value. predicate: Arc<dyn PhysicalExpr>, /// The input partition to filter. input: SendableRecordBatchStream, - /// runtime metrics recording + /// Runtime metrics recording baseline_metrics: BaselineMetrics, + /// The projection indices of the columns in the input schema + projection: Option<Vec<usize>>, } -pub(crate) fn batch_filter( +fn filter_and_project( batch: &RecordBatch, predicate: &Arc<dyn PhysicalExpr>, + projection: Option<&Vec<usize>>, + output_schema: &SchemaRef, ) -> Result<RecordBatch> { predicate .evaluate(batch) .and_then(|v| v.into_array(batch.num_rows())) .and_then(|array| { - Ok(match as_boolean_array(&array) { - // apply filter array to record batch - Ok(filter_array) => comet_filter_record_batch(batch, filter_array)?, - Err(_) => { + Ok(match (as_boolean_array(&array), projection) { + // Apply filter array to record batch + (Ok(filter_array), None) => filter_record_batch(batch, filter_array)?, + (Ok(filter_array), Some(projection)) => { + let projected_columns = projection + .iter() + .map(|i| Arc::clone(batch.column(*i))) + .collect(); + let projected_batch = + RecordBatch::try_new(Arc::clone(output_schema), projected_columns)?; + comet_filter_record_batch(&projected_batch, filter_array)? + } + (Err(_), _) => { return internal_err!("Cannot create filter_array from non-boolean predicates"); } }) @@ -402,9 +501,14 @@ impl Stream for FilterExecStream { match ready!(self.input.poll_next_unpin(cx)) { Some(Ok(batch)) => { let timer = self.baseline_metrics.elapsed_compute().timer(); - let filtered_batch = batch_filter(&batch, &self.predicate)?; + let filtered_batch = filter_and_project( + &batch, + &self.predicate, + self.projection.as_ref(), + &self.schema, + )?; timer.done(); - // skip entirely filtered batches + // Skip entirely filtered batches if filtered_batch.num_rows() == 0 { continue; } @@ -421,7 +525,7 @@ impl Stream for FilterExecStream { } fn size_hint(&self) -> (usize, Option<usize>) { - // same number of record batches + // Same number of record batches self.input.size_hint() } } From dee1b6520b4c615c6b15a48839b64821cf829867 Mon Sep 17 00:00:00 2001 From: Andy Grove <agrove@apache.org> Date: Thu, 23 Jan 2025 12:00:00 -0700 Subject: [PATCH 03/20] fix regression --- native/core/src/execution/operators/filter.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/native/core/src/execution/operators/filter.rs b/native/core/src/execution/operators/filter.rs index 1396bf645..0cbbef061 100644 --- a/native/core/src/execution/operators/filter.rs +++ b/native/core/src/execution/operators/filter.rs @@ -303,7 +303,7 @@ impl DisplayAs for FilterExec { } else { "".to_string() }; - write!(f, "FilterExec: {}{}", self.predicate, display_projections) + write!(f, "CometFilterExec: {}{}", self.predicate, display_projections) } } } @@ -311,7 +311,7 @@ impl DisplayAs for FilterExec { impl ExecutionPlan for FilterExec { fn name(&self) -> &'static str { - "FilterExec" + "CometFilterExec" } /// Return a reference to Any that can be used for downcasting From 30270e7b1d28a162da0ffbfcaa7aa7ce5b65367e Mon Sep 17 00:00:00 2001 From: Andy Grove <agrove@apache.org> Date: Thu, 23 Jan 2025 12:26:48 -0700 Subject: [PATCH 04/20] fmt --- native/core/src/execution/operators/filter.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/native/core/src/execution/operators/filter.rs b/native/core/src/execution/operators/filter.rs index 0cbbef061..7f042399f 100644 --- a/native/core/src/execution/operators/filter.rs +++ b/native/core/src/execution/operators/filter.rs @@ -303,7 +303,11 @@ impl DisplayAs for FilterExec { } else { "".to_string() }; - write!(f, "CometFilterExec: {}{}", self.predicate, display_projections) + write!( + f, + "CometFilterExec: {}{}", + self.predicate, display_projections + ) } } } From b11d2e70593c7e77e560b44640de61eca14f366a Mon Sep 17 00:00:00 2001 From: Andy Grove <agrove@apache.org> Date: Thu, 23 Jan 2025 13:58:25 -0700 Subject: [PATCH 05/20] revert change --- native/core/src/execution/operators/scan.rs | 7 ++++++- native/core/src/execution/shuffle/row.rs | 17 ++++++++++++++++- 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/native/core/src/execution/operators/scan.rs b/native/core/src/execution/operators/scan.rs index 81dfccefb..2110ad89c 100644 --- a/native/core/src/execution/operators/scan.rs +++ b/native/core/src/execution/operators/scan.rs @@ -304,7 +304,12 @@ fn scan_schema(input_batch: &InputBatch, data_types: &[DataType]) -> SchemaRef { .map(|(idx, c)| { let datatype = ScanExec::unpack_dictionary_type(c.data_type()); // We don't use the field name. Put a placeholder. - Field::new(format!("col_{}", idx), datatype, true) + if matches!(datatype, DataType::Dictionary(_, _)) { + #[allow(deprecated)] + Field::new_dict(format!("col_{}", idx), datatype, true, idx as i64, false) + } else { + Field::new(format!("col_{}", idx), datatype, true) + } }) .collect::<Vec<Field>>() } diff --git a/native/core/src/execution/shuffle/row.rs b/native/core/src/execution/shuffle/row.rs index f9ecf4790..3f25b46c9 100644 --- a/native/core/src/execution/shuffle/row.rs +++ b/native/core/src/execution/shuffle/row.rs @@ -3435,10 +3435,25 @@ fn builder_to_array( } fn make_batch(arrays: Vec<ArrayRef>, row_count: usize) -> Result<RecordBatch, ArrowError> { + let mut dict_id = 0; let fields = arrays .iter() .enumerate() - .map(|(i, array)| Field::new(format!("c{}", i), array.data_type().clone(), true)) + .map(|(i, array)| match array.data_type() { + DataType::Dictionary(_, _) => { + #[allow(deprecated)] + let field = Field::new_dict( + format!("c{}", i), + array.data_type().clone(), + true, + dict_id, + false, + ); + dict_id += 1; + field + } + _ => Field::new(format!("c{}", i), array.data_type().clone(), true), + }) .collect::<Vec<_>>(); let schema = Arc::new(Schema::new(fields)); let options = RecordBatchOptions::new().with_row_count(Option::from(row_count)); From e081fed844807823520e57a077b83bc5e9a90cb5 Mon Sep 17 00:00:00 2001 From: Andy Grove <agrove@apache.org> Date: Thu, 23 Jan 2025 15:07:27 -0700 Subject: [PATCH 06/20] fix regression --- native/core/src/execution/operators/filter.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/native/core/src/execution/operators/filter.rs b/native/core/src/execution/operators/filter.rs index 7f042399f..c8ca24077 100644 --- a/native/core/src/execution/operators/filter.rs +++ b/native/core/src/execution/operators/filter.rs @@ -451,7 +451,7 @@ fn filter_and_project( .and_then(|array| { Ok(match (as_boolean_array(&array), projection) { // Apply filter array to record batch - (Ok(filter_array), None) => filter_record_batch(batch, filter_array)?, + (Ok(filter_array), None) => comet_filter_record_batch(batch, filter_array)?, (Ok(filter_array), Some(projection)) => { let projected_columns = projection .iter() From f2c1409f529e3f5777f275493278d23bae0533ae Mon Sep 17 00:00:00 2001 From: Andy Grove <agrove@apache.org> Date: Thu, 23 Jan 2025 16:21:32 -0700 Subject: [PATCH 07/20] fix --- native/core/src/execution/planner.rs | 33 ++++++++++++++++++++-------- 1 file changed, 24 insertions(+), 9 deletions(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index fa0a86926..1365e4677 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -104,7 +104,7 @@ use datafusion_common::{ }; use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_expr::{ - AggregateUDF, ScalarUDF, WindowFrame, WindowFrameBound, WindowFrameUnits, + AggregateUDF, ReturnTypeArgs, ScalarUDF, WindowFrame, WindowFrameBound, WindowFrameUnits, WindowFunctionDefinition, }; use datafusion_functions_nested::array_has::ArrayHas; @@ -2269,16 +2269,31 @@ impl PhysicalPlanner { .coerce_types(&input_expr_types) .unwrap_or_else(|_| input_expr_types.clone()); - let data_type = match fun_name { - // workaround for https://github.com/apache/datafusion/issues/13716 - "datepart" => DataType::Int32, - _ => { - // TODO need to call `return_type_from_exprs` instead - #[allow(deprecated)] - func.inner().return_type(&coerced_types)? - } + // TODO this should try and find scalar + let arguments = args + .iter() + .map(|e| { + e.as_ref() + .as_any() + .downcast_ref::<Literal>() + .map(|lit| lit.value()) + }) + .collect::<Vec<_>>(); + + let nullables = arguments.iter().map(|_| true).collect::<Vec<_>>(); + + let args = ReturnTypeArgs { + arg_types: &coerced_types, + scalar_arguments: &arguments, + nullables: &nullables, }; + let data_type = func + .inner() + .return_type_from_args(args)? + .return_type() + .clone(); + (data_type, coerced_types) } }; From 6400197eef64097324322379e08da635e6df5785 Mon Sep 17 00:00:00 2001 From: Andy Grove <agrove@apache.org> Date: Fri, 24 Jan 2025 13:06:45 -0700 Subject: [PATCH 08/20] use temp datafusion branch --- native/Cargo.lock | 44 ++++++++++++++++++++++---------------------- native/Cargo.toml | 33 +++++++++++++++++++++++---------- 2 files changed, 45 insertions(+), 32 deletions(-) diff --git a/native/Cargo.lock b/native/Cargo.lock index 5215970dc..e8a291136 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -820,7 +820,7 @@ dependencies = [ [[package]] name = "datafusion" version = "44.0.0" -source = "git+https://github.com/apache/datafusion?rev=5592834#5592834617f2b8ca99332bdf0be753744af910fd" +source = "git+https://github.com/andygrove/datafusion?branch=issue-14277#d77d5005e841e8137ac2b761741a054a9e7b090e" dependencies = [ "arrow", "arrow-array", @@ -864,7 +864,7 @@ dependencies = [ [[package]] name = "datafusion-catalog" version = "44.0.0" -source = "git+https://github.com/apache/datafusion?rev=5592834#5592834617f2b8ca99332bdf0be753744af910fd" +source = "git+https://github.com/andygrove/datafusion?branch=issue-14277#d77d5005e841e8137ac2b761741a054a9e7b090e" dependencies = [ "arrow-schema", "async-trait", @@ -964,7 +964,7 @@ dependencies = [ [[package]] name = "datafusion-common" version = "44.0.0" -source = "git+https://github.com/apache/datafusion?rev=5592834#5592834617f2b8ca99332bdf0be753744af910fd" +source = "git+https://github.com/andygrove/datafusion?branch=issue-14277#d77d5005e841e8137ac2b761741a054a9e7b090e" dependencies = [ "ahash", "arrow", @@ -989,7 +989,7 @@ dependencies = [ [[package]] name = "datafusion-common-runtime" version = "44.0.0" -source = "git+https://github.com/apache/datafusion?rev=5592834#5592834617f2b8ca99332bdf0be753744af910fd" +source = "git+https://github.com/andygrove/datafusion?branch=issue-14277#d77d5005e841e8137ac2b761741a054a9e7b090e" dependencies = [ "log", "tokio", @@ -998,12 +998,12 @@ dependencies = [ [[package]] name = "datafusion-doc" version = "44.0.0" -source = "git+https://github.com/apache/datafusion?rev=5592834#5592834617f2b8ca99332bdf0be753744af910fd" +source = "git+https://github.com/andygrove/datafusion?branch=issue-14277#d77d5005e841e8137ac2b761741a054a9e7b090e" [[package]] name = "datafusion-execution" version = "44.0.0" -source = "git+https://github.com/apache/datafusion?rev=5592834#5592834617f2b8ca99332bdf0be753744af910fd" +source = "git+https://github.com/andygrove/datafusion?branch=issue-14277#d77d5005e841e8137ac2b761741a054a9e7b090e" dependencies = [ "arrow", "dashmap", @@ -1021,7 +1021,7 @@ dependencies = [ [[package]] name = "datafusion-expr" version = "44.0.0" -source = "git+https://github.com/apache/datafusion?rev=5592834#5592834617f2b8ca99332bdf0be753744af910fd" +source = "git+https://github.com/andygrove/datafusion?branch=issue-14277#d77d5005e841e8137ac2b761741a054a9e7b090e" dependencies = [ "arrow", "chrono", @@ -1040,7 +1040,7 @@ dependencies = [ [[package]] name = "datafusion-expr-common" version = "44.0.0" -source = "git+https://github.com/apache/datafusion?rev=5592834#5592834617f2b8ca99332bdf0be753744af910fd" +source = "git+https://github.com/andygrove/datafusion?branch=issue-14277#d77d5005e841e8137ac2b761741a054a9e7b090e" dependencies = [ "arrow", "datafusion-common", @@ -1050,7 +1050,7 @@ dependencies = [ [[package]] name = "datafusion-functions" version = "44.0.0" -source = "git+https://github.com/apache/datafusion?rev=5592834#5592834617f2b8ca99332bdf0be753744af910fd" +source = "git+https://github.com/andygrove/datafusion?branch=issue-14277#d77d5005e841e8137ac2b761741a054a9e7b090e" dependencies = [ "arrow", "arrow-buffer", @@ -1079,7 +1079,7 @@ dependencies = [ [[package]] name = "datafusion-functions-aggregate" version = "44.0.0" -source = "git+https://github.com/apache/datafusion?rev=5592834#5592834617f2b8ca99332bdf0be753744af910fd" +source = "git+https://github.com/andygrove/datafusion?branch=issue-14277#d77d5005e841e8137ac2b761741a054a9e7b090e" dependencies = [ "ahash", "arrow", @@ -1101,7 +1101,7 @@ dependencies = [ [[package]] name = "datafusion-functions-aggregate-common" version = "44.0.0" -source = "git+https://github.com/apache/datafusion?rev=5592834#5592834617f2b8ca99332bdf0be753744af910fd" +source = "git+https://github.com/andygrove/datafusion?branch=issue-14277#d77d5005e841e8137ac2b761741a054a9e7b090e" dependencies = [ "ahash", "arrow", @@ -1113,7 +1113,7 @@ dependencies = [ [[package]] name = "datafusion-functions-nested" version = "44.0.0" -source = "git+https://github.com/apache/datafusion?rev=5592834#5592834617f2b8ca99332bdf0be753744af910fd" +source = "git+https://github.com/andygrove/datafusion?branch=issue-14277#d77d5005e841e8137ac2b761741a054a9e7b090e" dependencies = [ "arrow", "arrow-array", @@ -1136,7 +1136,7 @@ dependencies = [ [[package]] name = "datafusion-functions-table" version = "44.0.0" -source = "git+https://github.com/apache/datafusion?rev=5592834#5592834617f2b8ca99332bdf0be753744af910fd" +source = "git+https://github.com/andygrove/datafusion?branch=issue-14277#d77d5005e841e8137ac2b761741a054a9e7b090e" dependencies = [ "arrow", "async-trait", @@ -1151,7 +1151,7 @@ dependencies = [ [[package]] name = "datafusion-functions-window" version = "44.0.0" -source = "git+https://github.com/apache/datafusion?rev=5592834#5592834617f2b8ca99332bdf0be753744af910fd" +source = "git+https://github.com/andygrove/datafusion?branch=issue-14277#d77d5005e841e8137ac2b761741a054a9e7b090e" dependencies = [ "datafusion-common", "datafusion-doc", @@ -1167,7 +1167,7 @@ dependencies = [ [[package]] name = "datafusion-functions-window-common" version = "44.0.0" -source = "git+https://github.com/apache/datafusion?rev=5592834#5592834617f2b8ca99332bdf0be753744af910fd" +source = "git+https://github.com/andygrove/datafusion?branch=issue-14277#d77d5005e841e8137ac2b761741a054a9e7b090e" dependencies = [ "datafusion-common", "datafusion-physical-expr-common", @@ -1176,7 +1176,7 @@ dependencies = [ [[package]] name = "datafusion-macros" version = "44.0.0" -source = "git+https://github.com/apache/datafusion?rev=5592834#5592834617f2b8ca99332bdf0be753744af910fd" +source = "git+https://github.com/andygrove/datafusion?branch=issue-14277#d77d5005e841e8137ac2b761741a054a9e7b090e" dependencies = [ "datafusion-expr", "quote", @@ -1186,7 +1186,7 @@ dependencies = [ [[package]] name = "datafusion-optimizer" version = "44.0.0" -source = "git+https://github.com/apache/datafusion?rev=5592834#5592834617f2b8ca99332bdf0be753744af910fd" +source = "git+https://github.com/andygrove/datafusion?branch=issue-14277#d77d5005e841e8137ac2b761741a054a9e7b090e" dependencies = [ "arrow", "chrono", @@ -1203,7 +1203,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" version = "44.0.0" -source = "git+https://github.com/apache/datafusion?rev=5592834#5592834617f2b8ca99332bdf0be753744af910fd" +source = "git+https://github.com/andygrove/datafusion?branch=issue-14277#d77d5005e841e8137ac2b761741a054a9e7b090e" dependencies = [ "ahash", "arrow", @@ -1227,7 +1227,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr-common" version = "44.0.0" -source = "git+https://github.com/apache/datafusion?rev=5592834#5592834617f2b8ca99332bdf0be753744af910fd" +source = "git+https://github.com/andygrove/datafusion?branch=issue-14277#d77d5005e841e8137ac2b761741a054a9e7b090e" dependencies = [ "ahash", "arrow", @@ -1240,7 +1240,7 @@ dependencies = [ [[package]] name = "datafusion-physical-optimizer" version = "44.0.0" -source = "git+https://github.com/apache/datafusion?rev=5592834#5592834617f2b8ca99332bdf0be753744af910fd" +source = "git+https://github.com/andygrove/datafusion?branch=issue-14277#d77d5005e841e8137ac2b761741a054a9e7b090e" dependencies = [ "arrow", "arrow-schema", @@ -1261,7 +1261,7 @@ dependencies = [ [[package]] name = "datafusion-physical-plan" version = "44.0.0" -source = "git+https://github.com/apache/datafusion?rev=5592834#5592834617f2b8ca99332bdf0be753744af910fd" +source = "git+https://github.com/andygrove/datafusion?branch=issue-14277#d77d5005e841e8137ac2b761741a054a9e7b090e" dependencies = [ "ahash", "arrow", @@ -1292,7 +1292,7 @@ dependencies = [ [[package]] name = "datafusion-sql" version = "44.0.0" -source = "git+https://github.com/apache/datafusion?rev=5592834#5592834617f2b8ca99332bdf0be753744af910fd" +source = "git+https://github.com/andygrove/datafusion?branch=issue-14277#d77d5005e841e8137ac2b761741a054a9e7b090e" dependencies = [ "arrow", "arrow-array", diff --git a/native/Cargo.toml b/native/Cargo.toml index 2c10e1d86..b5607bd2d 100644 --- a/native/Cargo.toml +++ b/native/Cargo.toml @@ -22,7 +22,7 @@ resolver = "2" [workspace.package] version = "0.6.0" homepage = "https://datafusion.apache.org/comet" -repository = "https://github.com/apache/datafusion-comet" +repository = "https://github.com/andygrove/datafusion-comet" authors = ["Apache DataFusion <dev@datafusion.apache.org>"] description = "Apache DataFusion Comet: High performance accelerator for Apache Spark" readme = "README.md" @@ -39,15 +39,28 @@ arrow-buffer = { version = "54.0.0" } arrow-data = { version = "54.0.0" } arrow-schema = { version = "54.0.0" } parquet = { version = "54.0.0", default-features = false, features = ["experimental"] } -datafusion = { git = "https://github.com/apache/datafusion", rev = "5592834", default-features = false, features = ["unicode_expressions", "crypto_expressions"] } -datafusion-common = { git = "https://github.com/apache/datafusion", rev = "5592834", default-features = false } -datafusion-functions = { git = "https://github.com/apache/datafusion", rev = "5592834", default-features = false, features = ["crypto_expressions"] } -datafusion-functions-nested = { git = "https://github.com/apache/datafusion", rev = "5592834", default-features = false } -datafusion-expr = { git = "https://github.com/apache/datafusion", rev = "5592834", default-features = false } -datafusion-expr-common = { git = "https://github.com/apache/datafusion", rev = "5592834", default-features = false } -datafusion-execution = { git = "https://github.com/apache/datafusion", rev = "5592834", default-features = false } -datafusion-physical-plan = { git = "https://github.com/apache/datafusion", rev = "5592834", default-features = false } -datafusion-physical-expr = { git = "https://github.com/apache/datafusion", rev = "5592834", default-features = false } + +datafusion = { git = "https://github.com/andygrove/datafusion", branch = "issue-14277", default-features = false, features = ["unicode_expressions", "crypto_expressions"] } +datafusion-common = { git = "https://github.com/andygrove/datafusion", branch = "issue-14277", default-features = false } +datafusion-functions = { git = "https://github.com/andygrove/datafusion", branch = "issue-14277", default-features = false, features = ["crypto_expressions"] } +datafusion-functions-nested = { git = "https://github.com/andygrove/datafusion", branch = "issue-14277", default-features = false } +datafusion-expr = { git = "https://github.com/andygrove/datafusion", branch = "issue-14277", default-features = false } +datafusion-expr-common = { git = "https://github.com/andygrove/datafusion", branch = "issue-14277", default-features = false } +datafusion-execution = { git = "https://github.com/andygrove/datafusion", branch = "issue-14277", default-features = false } +datafusion-physical-plan = { git = "https://github.com/andygrove/datafusion", branch = "issue-14277", default-features = false } +datafusion-physical-expr = { git = "https://github.com/andygrove/datafusion", branch = "issue-14277", default-features = false } + +# temp for testing DataFusion changes locally +#datafusion = { path = "/home/andy/git/apache/datafusion/datafusion/core", default-features = false, features = ["unicode_expressions", "crypto_expressions"] } +#datafusion-common = { path = "/home/andy/git/apache/datafusion/datafusion/common", default-features = false } +#datafusion-functions = { path = "/home/andy/git/apache/datafusion/datafusion/functions", default-features = false, features = ["crypto_expressions"] } +#datafusion-functions-nested = { path = "/home/andy/git/apache/datafusion/datafusion/functions-nested", default-features = false } +#datafusion-expr = { path = "/home/andy/git/apache/datafusion/datafusion/expr", default-features = false } +#datafusion-expr-common = { path = "/home/andy/git/apache/datafusion/datafusion/expr-common", default-features = false } +#datafusion-execution = { path = "/home/andy/git/apache/datafusion/datafusion/execution", default-features = false } +#datafusion-physical-plan = { path = "/home/andy/git/apache/datafusion/datafusion/physical-plan", default-features = false } +#datafusion-physical-expr = { path = "/home/andy/git/apache/datafusion/datafusion/physical-expr", default-features = false } + datafusion-comet-spark-expr = { path = "spark-expr", version = "0.6.0" } datafusion-comet-proto = { path = "proto", version = "0.6.0" } chrono = { version = "0.4", default-features = false, features = ["clock"] } From 0ee0b4ce4095b5aa6111aeed9202e0cc1c8a7704 Mon Sep 17 00:00:00 2001 From: Andy Grove <agrove@apache.org> Date: Fri, 24 Jan 2025 14:25:26 -0700 Subject: [PATCH 09/20] try removing Field::new_with_dict --- native/core/src/execution/operators/scan.rs | 7 +------ native/core/src/execution/shuffle/row.rs | 16 +--------------- 2 files changed, 2 insertions(+), 21 deletions(-) diff --git a/native/core/src/execution/operators/scan.rs b/native/core/src/execution/operators/scan.rs index 2110ad89c..81dfccefb 100644 --- a/native/core/src/execution/operators/scan.rs +++ b/native/core/src/execution/operators/scan.rs @@ -304,12 +304,7 @@ fn scan_schema(input_batch: &InputBatch, data_types: &[DataType]) -> SchemaRef { .map(|(idx, c)| { let datatype = ScanExec::unpack_dictionary_type(c.data_type()); // We don't use the field name. Put a placeholder. - if matches!(datatype, DataType::Dictionary(_, _)) { - #[allow(deprecated)] - Field::new_dict(format!("col_{}", idx), datatype, true, idx as i64, false) - } else { - Field::new(format!("col_{}", idx), datatype, true) - } + Field::new(format!("col_{}", idx), datatype, true) }) .collect::<Vec<Field>>() } diff --git a/native/core/src/execution/shuffle/row.rs b/native/core/src/execution/shuffle/row.rs index 3f25b46c9..a373d157a 100644 --- a/native/core/src/execution/shuffle/row.rs +++ b/native/core/src/execution/shuffle/row.rs @@ -3439,21 +3439,7 @@ fn make_batch(arrays: Vec<ArrayRef>, row_count: usize) -> Result<RecordBatch, Ar let fields = arrays .iter() .enumerate() - .map(|(i, array)| match array.data_type() { - DataType::Dictionary(_, _) => { - #[allow(deprecated)] - let field = Field::new_dict( - format!("c{}", i), - array.data_type().clone(), - true, - dict_id, - false, - ); - dict_id += 1; - field - } - _ => Field::new(format!("c{}", i), array.data_type().clone(), true), - }) + .map(|(i, array)| Field::new(format!("c{}", i), array.data_type().clone(), true)) .collect::<Vec<_>>(); let schema = Arc::new(Schema::new(fields)); let options = RecordBatchOptions::new().with_row_count(Option::from(row_count)); From eb7ddef9bfe15c136f1807041cc73dd90f66a904 Mon Sep 17 00:00:00 2001 From: Andy Grove <agrove@apache.org> Date: Fri, 24 Jan 2025 15:39:57 -0700 Subject: [PATCH 10/20] clippy --- native/core/src/execution/shuffle/row.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/native/core/src/execution/shuffle/row.rs b/native/core/src/execution/shuffle/row.rs index a373d157a..f9ecf4790 100644 --- a/native/core/src/execution/shuffle/row.rs +++ b/native/core/src/execution/shuffle/row.rs @@ -3435,7 +3435,6 @@ fn builder_to_array( } fn make_batch(arrays: Vec<ArrayRef>, row_count: usize) -> Result<RecordBatch, ArrowError> { - let mut dict_id = 0; let fields = arrays .iter() .enumerate() From 2f65a3160d816632de1e45a2e78bc866044ea949 Mon Sep 17 00:00:00 2001 From: Andy Grove <agrove@apache.org> Date: Mon, 27 Jan 2025 09:16:21 -0700 Subject: [PATCH 11/20] coerce types for CASE expressions --- native/Cargo.lock | 44 ++++++++--------- native/Cargo.toml | 33 ++++--------- native/core/src/execution/planner.rs | 72 ++++++++++++++++++++++++---- 3 files changed, 96 insertions(+), 53 deletions(-) diff --git a/native/Cargo.lock b/native/Cargo.lock index e8a291136..5215970dc 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -820,7 +820,7 @@ dependencies = [ [[package]] name = "datafusion" version = "44.0.0" -source = "git+https://github.com/andygrove/datafusion?branch=issue-14277#d77d5005e841e8137ac2b761741a054a9e7b090e" +source = "git+https://github.com/apache/datafusion?rev=5592834#5592834617f2b8ca99332bdf0be753744af910fd" dependencies = [ "arrow", "arrow-array", @@ -864,7 +864,7 @@ dependencies = [ [[package]] name = "datafusion-catalog" version = "44.0.0" -source = "git+https://github.com/andygrove/datafusion?branch=issue-14277#d77d5005e841e8137ac2b761741a054a9e7b090e" +source = "git+https://github.com/apache/datafusion?rev=5592834#5592834617f2b8ca99332bdf0be753744af910fd" dependencies = [ "arrow-schema", "async-trait", @@ -964,7 +964,7 @@ dependencies = [ [[package]] name = "datafusion-common" version = "44.0.0" -source = "git+https://github.com/andygrove/datafusion?branch=issue-14277#d77d5005e841e8137ac2b761741a054a9e7b090e" +source = "git+https://github.com/apache/datafusion?rev=5592834#5592834617f2b8ca99332bdf0be753744af910fd" dependencies = [ "ahash", "arrow", @@ -989,7 +989,7 @@ dependencies = [ [[package]] name = "datafusion-common-runtime" version = "44.0.0" -source = "git+https://github.com/andygrove/datafusion?branch=issue-14277#d77d5005e841e8137ac2b761741a054a9e7b090e" +source = "git+https://github.com/apache/datafusion?rev=5592834#5592834617f2b8ca99332bdf0be753744af910fd" dependencies = [ "log", "tokio", @@ -998,12 +998,12 @@ dependencies = [ [[package]] name = "datafusion-doc" version = "44.0.0" -source = "git+https://github.com/andygrove/datafusion?branch=issue-14277#d77d5005e841e8137ac2b761741a054a9e7b090e" +source = "git+https://github.com/apache/datafusion?rev=5592834#5592834617f2b8ca99332bdf0be753744af910fd" [[package]] name = "datafusion-execution" version = "44.0.0" -source = "git+https://github.com/andygrove/datafusion?branch=issue-14277#d77d5005e841e8137ac2b761741a054a9e7b090e" +source = "git+https://github.com/apache/datafusion?rev=5592834#5592834617f2b8ca99332bdf0be753744af910fd" dependencies = [ "arrow", "dashmap", @@ -1021,7 +1021,7 @@ dependencies = [ [[package]] name = "datafusion-expr" version = "44.0.0" -source = "git+https://github.com/andygrove/datafusion?branch=issue-14277#d77d5005e841e8137ac2b761741a054a9e7b090e" +source = "git+https://github.com/apache/datafusion?rev=5592834#5592834617f2b8ca99332bdf0be753744af910fd" dependencies = [ "arrow", "chrono", @@ -1040,7 +1040,7 @@ dependencies = [ [[package]] name = "datafusion-expr-common" version = "44.0.0" -source = "git+https://github.com/andygrove/datafusion?branch=issue-14277#d77d5005e841e8137ac2b761741a054a9e7b090e" +source = "git+https://github.com/apache/datafusion?rev=5592834#5592834617f2b8ca99332bdf0be753744af910fd" dependencies = [ "arrow", "datafusion-common", @@ -1050,7 +1050,7 @@ dependencies = [ [[package]] name = "datafusion-functions" version = "44.0.0" -source = "git+https://github.com/andygrove/datafusion?branch=issue-14277#d77d5005e841e8137ac2b761741a054a9e7b090e" +source = "git+https://github.com/apache/datafusion?rev=5592834#5592834617f2b8ca99332bdf0be753744af910fd" dependencies = [ "arrow", "arrow-buffer", @@ -1079,7 +1079,7 @@ dependencies = [ [[package]] name = "datafusion-functions-aggregate" version = "44.0.0" -source = "git+https://github.com/andygrove/datafusion?branch=issue-14277#d77d5005e841e8137ac2b761741a054a9e7b090e" +source = "git+https://github.com/apache/datafusion?rev=5592834#5592834617f2b8ca99332bdf0be753744af910fd" dependencies = [ "ahash", "arrow", @@ -1101,7 +1101,7 @@ dependencies = [ [[package]] name = "datafusion-functions-aggregate-common" version = "44.0.0" -source = "git+https://github.com/andygrove/datafusion?branch=issue-14277#d77d5005e841e8137ac2b761741a054a9e7b090e" +source = "git+https://github.com/apache/datafusion?rev=5592834#5592834617f2b8ca99332bdf0be753744af910fd" dependencies = [ "ahash", "arrow", @@ -1113,7 +1113,7 @@ dependencies = [ [[package]] name = "datafusion-functions-nested" version = "44.0.0" -source = "git+https://github.com/andygrove/datafusion?branch=issue-14277#d77d5005e841e8137ac2b761741a054a9e7b090e" +source = "git+https://github.com/apache/datafusion?rev=5592834#5592834617f2b8ca99332bdf0be753744af910fd" dependencies = [ "arrow", "arrow-array", @@ -1136,7 +1136,7 @@ dependencies = [ [[package]] name = "datafusion-functions-table" version = "44.0.0" -source = "git+https://github.com/andygrove/datafusion?branch=issue-14277#d77d5005e841e8137ac2b761741a054a9e7b090e" +source = "git+https://github.com/apache/datafusion?rev=5592834#5592834617f2b8ca99332bdf0be753744af910fd" dependencies = [ "arrow", "async-trait", @@ -1151,7 +1151,7 @@ dependencies = [ [[package]] name = "datafusion-functions-window" version = "44.0.0" -source = "git+https://github.com/andygrove/datafusion?branch=issue-14277#d77d5005e841e8137ac2b761741a054a9e7b090e" +source = "git+https://github.com/apache/datafusion?rev=5592834#5592834617f2b8ca99332bdf0be753744af910fd" dependencies = [ "datafusion-common", "datafusion-doc", @@ -1167,7 +1167,7 @@ dependencies = [ [[package]] name = "datafusion-functions-window-common" version = "44.0.0" -source = "git+https://github.com/andygrove/datafusion?branch=issue-14277#d77d5005e841e8137ac2b761741a054a9e7b090e" +source = "git+https://github.com/apache/datafusion?rev=5592834#5592834617f2b8ca99332bdf0be753744af910fd" dependencies = [ "datafusion-common", "datafusion-physical-expr-common", @@ -1176,7 +1176,7 @@ dependencies = [ [[package]] name = "datafusion-macros" version = "44.0.0" -source = "git+https://github.com/andygrove/datafusion?branch=issue-14277#d77d5005e841e8137ac2b761741a054a9e7b090e" +source = "git+https://github.com/apache/datafusion?rev=5592834#5592834617f2b8ca99332bdf0be753744af910fd" dependencies = [ "datafusion-expr", "quote", @@ -1186,7 +1186,7 @@ dependencies = [ [[package]] name = "datafusion-optimizer" version = "44.0.0" -source = "git+https://github.com/andygrove/datafusion?branch=issue-14277#d77d5005e841e8137ac2b761741a054a9e7b090e" +source = "git+https://github.com/apache/datafusion?rev=5592834#5592834617f2b8ca99332bdf0be753744af910fd" dependencies = [ "arrow", "chrono", @@ -1203,7 +1203,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" version = "44.0.0" -source = "git+https://github.com/andygrove/datafusion?branch=issue-14277#d77d5005e841e8137ac2b761741a054a9e7b090e" +source = "git+https://github.com/apache/datafusion?rev=5592834#5592834617f2b8ca99332bdf0be753744af910fd" dependencies = [ "ahash", "arrow", @@ -1227,7 +1227,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr-common" version = "44.0.0" -source = "git+https://github.com/andygrove/datafusion?branch=issue-14277#d77d5005e841e8137ac2b761741a054a9e7b090e" +source = "git+https://github.com/apache/datafusion?rev=5592834#5592834617f2b8ca99332bdf0be753744af910fd" dependencies = [ "ahash", "arrow", @@ -1240,7 +1240,7 @@ dependencies = [ [[package]] name = "datafusion-physical-optimizer" version = "44.0.0" -source = "git+https://github.com/andygrove/datafusion?branch=issue-14277#d77d5005e841e8137ac2b761741a054a9e7b090e" +source = "git+https://github.com/apache/datafusion?rev=5592834#5592834617f2b8ca99332bdf0be753744af910fd" dependencies = [ "arrow", "arrow-schema", @@ -1261,7 +1261,7 @@ dependencies = [ [[package]] name = "datafusion-physical-plan" version = "44.0.0" -source = "git+https://github.com/andygrove/datafusion?branch=issue-14277#d77d5005e841e8137ac2b761741a054a9e7b090e" +source = "git+https://github.com/apache/datafusion?rev=5592834#5592834617f2b8ca99332bdf0be753744af910fd" dependencies = [ "ahash", "arrow", @@ -1292,7 +1292,7 @@ dependencies = [ [[package]] name = "datafusion-sql" version = "44.0.0" -source = "git+https://github.com/andygrove/datafusion?branch=issue-14277#d77d5005e841e8137ac2b761741a054a9e7b090e" +source = "git+https://github.com/apache/datafusion?rev=5592834#5592834617f2b8ca99332bdf0be753744af910fd" dependencies = [ "arrow", "arrow-array", diff --git a/native/Cargo.toml b/native/Cargo.toml index b5607bd2d..2c10e1d86 100644 --- a/native/Cargo.toml +++ b/native/Cargo.toml @@ -22,7 +22,7 @@ resolver = "2" [workspace.package] version = "0.6.0" homepage = "https://datafusion.apache.org/comet" -repository = "https://github.com/andygrove/datafusion-comet" +repository = "https://github.com/apache/datafusion-comet" authors = ["Apache DataFusion <dev@datafusion.apache.org>"] description = "Apache DataFusion Comet: High performance accelerator for Apache Spark" readme = "README.md" @@ -39,28 +39,15 @@ arrow-buffer = { version = "54.0.0" } arrow-data = { version = "54.0.0" } arrow-schema = { version = "54.0.0" } parquet = { version = "54.0.0", default-features = false, features = ["experimental"] } - -datafusion = { git = "https://github.com/andygrove/datafusion", branch = "issue-14277", default-features = false, features = ["unicode_expressions", "crypto_expressions"] } -datafusion-common = { git = "https://github.com/andygrove/datafusion", branch = "issue-14277", default-features = false } -datafusion-functions = { git = "https://github.com/andygrove/datafusion", branch = "issue-14277", default-features = false, features = ["crypto_expressions"] } -datafusion-functions-nested = { git = "https://github.com/andygrove/datafusion", branch = "issue-14277", default-features = false } -datafusion-expr = { git = "https://github.com/andygrove/datafusion", branch = "issue-14277", default-features = false } -datafusion-expr-common = { git = "https://github.com/andygrove/datafusion", branch = "issue-14277", default-features = false } -datafusion-execution = { git = "https://github.com/andygrove/datafusion", branch = "issue-14277", default-features = false } -datafusion-physical-plan = { git = "https://github.com/andygrove/datafusion", branch = "issue-14277", default-features = false } -datafusion-physical-expr = { git = "https://github.com/andygrove/datafusion", branch = "issue-14277", default-features = false } - -# temp for testing DataFusion changes locally -#datafusion = { path = "/home/andy/git/apache/datafusion/datafusion/core", default-features = false, features = ["unicode_expressions", "crypto_expressions"] } -#datafusion-common = { path = "/home/andy/git/apache/datafusion/datafusion/common", default-features = false } -#datafusion-functions = { path = "/home/andy/git/apache/datafusion/datafusion/functions", default-features = false, features = ["crypto_expressions"] } -#datafusion-functions-nested = { path = "/home/andy/git/apache/datafusion/datafusion/functions-nested", default-features = false } -#datafusion-expr = { path = "/home/andy/git/apache/datafusion/datafusion/expr", default-features = false } -#datafusion-expr-common = { path = "/home/andy/git/apache/datafusion/datafusion/expr-common", default-features = false } -#datafusion-execution = { path = "/home/andy/git/apache/datafusion/datafusion/execution", default-features = false } -#datafusion-physical-plan = { path = "/home/andy/git/apache/datafusion/datafusion/physical-plan", default-features = false } -#datafusion-physical-expr = { path = "/home/andy/git/apache/datafusion/datafusion/physical-expr", default-features = false } - +datafusion = { git = "https://github.com/apache/datafusion", rev = "5592834", default-features = false, features = ["unicode_expressions", "crypto_expressions"] } +datafusion-common = { git = "https://github.com/apache/datafusion", rev = "5592834", default-features = false } +datafusion-functions = { git = "https://github.com/apache/datafusion", rev = "5592834", default-features = false, features = ["crypto_expressions"] } +datafusion-functions-nested = { git = "https://github.com/apache/datafusion", rev = "5592834", default-features = false } +datafusion-expr = { git = "https://github.com/apache/datafusion", rev = "5592834", default-features = false } +datafusion-expr-common = { git = "https://github.com/apache/datafusion", rev = "5592834", default-features = false } +datafusion-execution = { git = "https://github.com/apache/datafusion", rev = "5592834", default-features = false } +datafusion-physical-plan = { git = "https://github.com/apache/datafusion", rev = "5592834", default-features = false } +datafusion-physical-expr = { git = "https://github.com/apache/datafusion", rev = "5592834", default-features = false } datafusion-comet-spark-expr = { path = "spark-expr", version = "0.6.0" } datafusion-comet-proto = { path = "proto", version = "0.6.0" } chrono = { version = "0.4", default-features = false, features = ["clock"] } diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 1365e4677..e846f1be8 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -103,6 +103,7 @@ use datafusion_common::{ JoinType as DFJoinType, ScalarValue, }; use datafusion_execution::object_store::ObjectStoreUrl; +use datafusion_expr::type_coercion::other::get_coerce_type_for_case_expression; use datafusion_expr::{ AggregateUDF, ReturnTypeArgs, ScalarUDF, WindowFrame, WindowFrameBound, WindowFrameUnits, WindowFunctionDefinition, @@ -582,15 +583,70 @@ impl PhysicalPlanner { let else_phy_expr = match &case_when.else_expr { None => None, - Some(_) => { - Some(self.create_expr(case_when.else_expr.as_ref().unwrap(), input_schema)?) - } + Some(_) => Some(self.create_expr( + case_when.else_expr.as_ref().unwrap(), + Arc::clone(&input_schema), + )?), }; - Ok(Arc::new(CaseExpr::try_new( - None, - when_then_pairs, - else_phy_expr, - )?)) + + let when_types: Vec<DataType> = when_then_pairs + .iter() + .map(|x| x.0.data_type(&input_schema)) + .collect::<Result<Vec<_>, _>>()?; + let then_types: Vec<DataType> = when_then_pairs + .iter() + .map(|x| x.1.data_type(&input_schema)) + .collect::<Result<Vec<_>, _>>()?; + let else_type: Option<DataType> = else_phy_expr + .as_ref() + .map(|x| Arc::clone(x).data_type(&input_schema)) + .transpose()?; + + let mut when_then_types = vec![]; + when_then_types.extend(when_types); + when_then_types.extend(then_types); + + if let Some(coerce_type) = + get_coerce_type_for_case_expression(&when_then_types, else_type.as_ref()) + { + let cast_options = + SparkCastOptions::new_without_timezone(EvalMode::Legacy, false); + + let when_then_pairs = when_then_pairs + .iter() + .map(|x| { + let w: Arc<dyn PhysicalExpr> = Arc::new(Cast::new( + Arc::clone(&x.0), + coerce_type.clone(), + cast_options.clone(), + )); + let t: Arc<dyn PhysicalExpr> = Arc::new(Cast::new( + Arc::clone(&x.1), + coerce_type.clone(), + cast_options.clone(), + )); + (w, t) + }) + .collect::<Vec<(Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>)>>(); + + let else_phy_expr: Option<Arc<dyn PhysicalExpr>> = + else_phy_expr.clone().map(|x| { + Arc::new(Cast::new(x, coerce_type.clone(), cast_options.clone())) + as Arc<dyn PhysicalExpr> + }); + + Ok(Arc::new(CaseExpr::try_new( + None, + when_then_pairs, + else_phy_expr, + )?)) + } else { + Ok(Arc::new(CaseExpr::try_new( + None, + when_then_pairs, + else_phy_expr.clone(), + )?)) + } } ExprStruct::In(expr) => { let value = From 48c641980caf55c2fddde15a3544071fc6ed8984 Mon Sep 17 00:00:00 2001 From: Andy Grove <agrove@apache.org> Date: Mon, 27 Jan 2025 13:13:35 -0700 Subject: [PATCH 12/20] save experiments --- native/core/src/execution/planner.rs | 35 ++++++++----------- .../comet/CometArrayExpressionSuite.scala | 16 +++++---- 2 files changed, 25 insertions(+), 26 deletions(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index f40aaf1f0..5d5831c6d 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -180,6 +180,8 @@ impl PhysicalPlanner { spark_expr: &Expr, input_schema: SchemaRef, ) -> Result<Arc<dyn PhysicalExpr>, ExecutionError> { + println!("create_expr() {:?}", spark_expr); + match spark_expr.expr_struct.as_ref().unwrap() { ExprStruct::Add(expr) => self.create_binary_expr( expr.left.as_ref().unwrap(), @@ -590,43 +592,35 @@ impl PhysicalPlanner { )?), }; - let when_types: Vec<DataType> = when_then_pairs - .iter() - .map(|x| x.0.data_type(&input_schema)) - .collect::<Result<Vec<_>, _>>()?; let then_types: Vec<DataType> = when_then_pairs .iter() .map(|x| x.1.data_type(&input_schema)) .collect::<Result<Vec<_>, _>>()?; + let else_type: Option<DataType> = else_phy_expr .as_ref() .map(|x| Arc::clone(x).data_type(&input_schema)) - .transpose()?; + .transpose()? + .or(Some(DataType::Null)); - let mut when_then_types = vec![]; - when_then_types.extend(when_types); - when_then_types.extend(then_types); + println!("get_coerce_type_for_case_expression(${then_types:?}, {else_type:?})"); if let Some(coerce_type) = - get_coerce_type_for_case_expression(&when_then_types, else_type.as_ref()) + get_coerce_type_for_case_expression(&then_types, else_type.as_ref()) { + println!("get_coerce_type_for_case_expression -> {coerce_type}"); let cast_options = SparkCastOptions::new_without_timezone(EvalMode::Legacy, false); let when_then_pairs = when_then_pairs .iter() .map(|x| { - let w: Arc<dyn PhysicalExpr> = Arc::new(Cast::new( - Arc::clone(&x.0), - coerce_type.clone(), - cast_options.clone(), - )); let t: Arc<dyn PhysicalExpr> = Arc::new(Cast::new( Arc::clone(&x.1), coerce_type.clone(), cast_options.clone(), )); - (w, t) + (Arc::clone(&x.0), t) }) .collect::<Vec<(Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>)>>(); @@ -636,12 +630,13 @@ impl PhysicalPlanner { as Arc<dyn PhysicalExpr> }); - Ok(Arc::new(CaseExpr::try_new( - None, - when_then_pairs, - else_phy_expr, - )?)) + let x = Arc::new(CaseExpr::try_new(None, when_then_pairs, else_phy_expr)?); + + println!("{x:?}"); + + Ok(x) } else { + println!("get_coerce_type_for_case_expression -> None"); Ok(Arc::new(CaseExpr::try_new( None, when_then_pairs, diff --git a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala index 5727f9f90..668aa4189 100644 --- a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala @@ -37,12 +37,16 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp val path = new Path(dir.toURI.toString, "test.parquet") makeParquetFileAllTypes(path, dictionaryEnabled, 10000) spark.read.parquet(path.toString).createOrReplaceTempView("t1") - checkSparkAnswerAndOperator( - sql("SELECT array_remove(array(_2, _3,_4), _2) from t1 where _2 is null")) - checkSparkAnswerAndOperator( - sql("SELECT array_remove(array(_2, _3,_4), _3) from t1 where _3 is not null")) - checkSparkAnswerAndOperator(sql( - "SELECT array_remove(case when _2 = _3 THEN array(_2, _3,_4) ELSE null END, _3) from t1")) + withSQLConf( + CometConf.COMET_EXPLAIN_NATIVE_ENABLED.key -> "true", + CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key -> "true") { +// checkSparkAnswerAndOperator( +// sql("SELECT array_remove(array(_2, _3,_4), _2) from t1 where _2 is null")) +// checkSparkAnswerAndOperator( +// sql("SELECT array_remove(array(_2, _3,_4), _3) from t1 where _3 is not null")) + checkSparkAnswerAndOperator(sql( + "SELECT array_remove(case when _2 = _3 THEN array(_2, _3,_4) ELSE null END, _3) from t1")) + } } } } From 3de7be1a4a2fabc88114434905361fc7ab265919 Mon Sep 17 00:00:00 2001 From: Andy Grove <agrove@apache.org> Date: Mon, 27 Jan 2025 16:17:53 -0700 Subject: [PATCH 13/20] test passes --- native/core/src/execution/jni_api.rs | 3 + native/core/src/execution/planner.rs | 126 +++++++++--------- .../spark-expr/src/conversion_funcs/cast.rs | 3 + 3 files changed, 72 insertions(+), 60 deletions(-) diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index fe29d8da1..fb147f998 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -473,11 +473,14 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan( let start = Instant::now(); let planner = PhysicalPlanner::new(Arc::clone(&exec_context.session_ctx)) .with_exec_id(exec_context_id); + + println!("BEFORE create_plan"); let (scans, root_op) = planner.create_plan( &exec_context.spark_plan, &mut exec_context.input_sources.clone(), exec_context.partition_count, )?; + println!("AFTER create_plan: {root_op:?}"); let physical_plan_time = start.elapsed(); exec_context.plan_creation_time += physical_plan_time; diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 5d5831c6d..220c283d2 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -592,57 +592,7 @@ impl PhysicalPlanner { )?), }; - let then_types: Vec<DataType> = when_then_pairs - .iter() - .map(|x| x.1.data_type(&input_schema)) - .collect::<Result<Vec<_>, _>>()?; - - let else_type: Option<DataType> = else_phy_expr - .as_ref() - .map(|x| Arc::clone(x).data_type(&input_schema)) - .transpose()? - .or(Some(DataType::Null)); - - println!("get_coerce_type_for_case_expression(${then_types:?}, {else_type:?})"); - - if let Some(coerce_type) = - get_coerce_type_for_case_expression(&then_types, else_type.as_ref()) - { - println!("get_coerce_type_for_case_expression -> {coerce_type}"); - let cast_options = - SparkCastOptions::new_without_timezone(EvalMode::Legacy, false); - - let when_then_pairs = when_then_pairs - .iter() - .map(|x| { - let t: Arc<dyn PhysicalExpr> = Arc::new(Cast::new( - Arc::clone(&x.1), - coerce_type.clone(), - cast_options.clone(), - )); - (Arc::clone(&x.0), t) - }) - .collect::<Vec<(Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>)>>(); - - let else_phy_expr: Option<Arc<dyn PhysicalExpr>> = - else_phy_expr.clone().map(|x| { - Arc::new(Cast::new(x, coerce_type.clone(), cast_options.clone())) - as Arc<dyn PhysicalExpr> - }); - - let x = Arc::new(CaseExpr::try_new(None, when_then_pairs, else_phy_expr)?); - - println!("{x:?}"); - - Ok(x) - } else { - println!("get_coerce_type_for_case_expression -> None"); - Ok(Arc::new(CaseExpr::try_new( - None, - when_then_pairs, - else_phy_expr.clone(), - )?)) - } + create_case_expr(when_then_pairs, else_phy_expr, &input_schema) } ExprStruct::In(expr) => { let value = @@ -760,12 +710,11 @@ impl PhysicalPlanner { let null_literal_expr: Arc<dyn PhysicalExpr> = Arc::new(Literal::new(ScalarValue::Null)); - let case_expr = CaseExpr::try_new( - None, + create_case_expr( vec![(is_null_expr, null_literal_expr)], Some(array_append_expr), - )?; - Ok(Arc::new(case_expr)) + &input_schema, + ) } ExprStruct::ArrayInsert(expr) => { let src_array_expr = self.create_expr( @@ -818,13 +767,11 @@ impl PhysicalPlanner { let null_literal_expr: Arc<dyn PhysicalExpr> = Arc::new(Literal::new(ScalarValue::Null)); - let case_expr = CaseExpr::try_new( - None, + create_case_expr( vec![(is_null_expr, null_literal_expr)], Some(array_remove_expr), - )?; - - Ok(Arc::new(case_expr)) + &input_schema, + ) } ExprStruct::ArrayIntersect(expr) => { let left_expr = @@ -1005,6 +952,8 @@ impl PhysicalPlanner { let children = &spark_plan.children; match spark_plan.op_struct.as_ref().unwrap() { OpStruct::Projection(project) => { + println!("Projection: {project:?}"); + assert!(children.len() == 1); let (scans, child) = self.create_plan(&children[0], inputs, partition_count)?; let exprs: PhyExprResult = project @@ -2606,6 +2555,63 @@ fn convert_spark_types_to_arrow_schema( arrow_schema } +/// Create CASE WHEN expression and add casting as needed +fn create_case_expr( + when_then_pairs: Vec<(Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>)>, + else_expr: Option<Arc<dyn PhysicalExpr>>, + input_schema: &Schema, +) -> Result<Arc<dyn PhysicalExpr>, ExecutionError> { + let then_types: Vec<DataType> = when_then_pairs + .iter() + .map(|x| x.1.data_type(&input_schema)) + .collect::<Result<Vec<_>, _>>()?; + + let else_type: Option<DataType> = else_expr + .as_ref() + .map(|x| Arc::clone(x).data_type(&input_schema)) + .transpose()? + .or(Some(DataType::Null)); + + if let Some(coerce_type) = get_coerce_type_for_case_expression(&then_types, else_type.as_ref()) + { + println!( + "get_coerce_type_for_case_expression(${then_types:?}, {else_type:?}) -> {coerce_type}" + ); + + let cast_options = SparkCastOptions::new_without_timezone(EvalMode::Legacy, false); + + let when_then_pairs = when_then_pairs + .iter() + .map(|x| { + let t: Arc<dyn PhysicalExpr> = Arc::new(Cast::new( + Arc::clone(&x.1), + coerce_type.clone(), + cast_options.clone(), + )); + (Arc::clone(&x.0), t) + }) + .collect::<Vec<(Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>)>>(); + + let else_phy_expr: Option<Arc<dyn PhysicalExpr>> = else_expr.clone().map(|x| { + Arc::new(Cast::new(x, coerce_type.clone(), cast_options.clone())) + as Arc<dyn PhysicalExpr> + }); + + let x = Arc::new(CaseExpr::try_new(None, when_then_pairs, else_phy_expr)?); + + println!("AFTER COERCION: {x:?}"); + + Ok(x) + } else { + println!("get_coerce_type_for_case_expression -> None"); + Ok(Arc::new(CaseExpr::try_new( + None, + when_then_pairs, + else_expr.clone(), + )?)) + } +} + #[cfg(test)] mod tests { use std::{sync::Arc, task::Poll}; diff --git a/native/spark-expr/src/conversion_funcs/cast.rs b/native/spark-expr/src/conversion_funcs/cast.rs index 0f5976132..884615287 100644 --- a/native/spark-expr/src/conversion_funcs/cast.rs +++ b/native/spark-expr/src/conversion_funcs/cast.rs @@ -988,6 +988,9 @@ fn is_datafusion_spark_compatible( return true; } match from_type { + DataType::Null => { + matches!(to_type, DataType::List(_)) + }, DataType::Boolean => matches!( to_type, DataType::Int8 From e8f222d98bdfc8a8c8ea3fbc1ae0bc7c2f8a2362 Mon Sep 17 00:00:00 2001 From: Andy Grove <agrove@apache.org> Date: Mon, 27 Jan 2025 16:18:33 -0700 Subject: [PATCH 14/20] remove debug logging --- native/core/src/execution/planner.rs | 20 +++++-------------- .../spark-expr/src/conversion_funcs/cast.rs | 2 +- 2 files changed, 6 insertions(+), 16 deletions(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 220c283d2..1c9292a1c 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -180,8 +180,6 @@ impl PhysicalPlanner { spark_expr: &Expr, input_schema: SchemaRef, ) -> Result<Arc<dyn PhysicalExpr>, ExecutionError> { - println!("create_expr() {:?}", spark_expr); - match spark_expr.expr_struct.as_ref().unwrap() { ExprStruct::Add(expr) => self.create_binary_expr( expr.left.as_ref().unwrap(), @@ -952,8 +950,6 @@ impl PhysicalPlanner { let children = &spark_plan.children; match spark_plan.op_struct.as_ref().unwrap() { OpStruct::Projection(project) => { - println!("Projection: {project:?}"); - assert!(children.len() == 1); let (scans, child) = self.create_plan(&children[0], inputs, partition_count)?; let exprs: PhyExprResult = project @@ -2574,10 +2570,6 @@ fn create_case_expr( if let Some(coerce_type) = get_coerce_type_for_case_expression(&then_types, else_type.as_ref()) { - println!( - "get_coerce_type_for_case_expression(${then_types:?}, {else_type:?}) -> {coerce_type}" - ); - let cast_options = SparkCastOptions::new_without_timezone(EvalMode::Legacy, false); let when_then_pairs = when_then_pairs @@ -2596,14 +2588,12 @@ fn create_case_expr( Arc::new(Cast::new(x, coerce_type.clone(), cast_options.clone())) as Arc<dyn PhysicalExpr> }); - - let x = Arc::new(CaseExpr::try_new(None, when_then_pairs, else_phy_expr)?); - - println!("AFTER COERCION: {x:?}"); - - Ok(x) + Ok(Arc::new(CaseExpr::try_new( + None, + when_then_pairs, + else_phy_expr, + )?)); } else { - println!("get_coerce_type_for_case_expression -> None"); Ok(Arc::new(CaseExpr::try_new( None, when_then_pairs, diff --git a/native/spark-expr/src/conversion_funcs/cast.rs b/native/spark-expr/src/conversion_funcs/cast.rs index 884615287..22b09f610 100644 --- a/native/spark-expr/src/conversion_funcs/cast.rs +++ b/native/spark-expr/src/conversion_funcs/cast.rs @@ -990,7 +990,7 @@ fn is_datafusion_spark_compatible( match from_type { DataType::Null => { matches!(to_type, DataType::List(_)) - }, + } DataType::Boolean => matches!( to_type, DataType::Int8 From 6fff6af070140d9df1783d5ffda5cd71f94c41a6 Mon Sep 17 00:00:00 2001 From: Andy Grove <agrove@apache.org> Date: Mon, 27 Jan 2025 16:19:13 -0700 Subject: [PATCH 15/20] remove debug logging --- native/core/src/execution/jni_api.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index fb147f998..8e324845a 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -474,13 +474,11 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan( let planner = PhysicalPlanner::new(Arc::clone(&exec_context.session_ctx)) .with_exec_id(exec_context_id); - println!("BEFORE create_plan"); let (scans, root_op) = planner.create_plan( &exec_context.spark_plan, &mut exec_context.input_sources.clone(), exec_context.partition_count, )?; - println!("AFTER create_plan: {root_op:?}"); let physical_plan_time = start.elapsed(); exec_context.plan_creation_time += physical_plan_time; From 18a2c59e5af919a93475a977c271c3fdf90dc45a Mon Sep 17 00:00:00 2001 From: Andy Grove <agrove@apache.org> Date: Mon, 27 Jan 2025 16:21:33 -0700 Subject: [PATCH 16/20] remove debug logging --- native/core/src/execution/planner.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 1c9292a1c..c92a3bdbc 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -2592,7 +2592,7 @@ fn create_case_expr( None, when_then_pairs, else_phy_expr, - )?)); + )?)) } else { Ok(Arc::new(CaseExpr::try_new( None, From c86dd391d8136346be2056d7a8fe7a18dbefcfde Mon Sep 17 00:00:00 2001 From: Andy Grove <agrove@apache.org> Date: Mon, 27 Jan 2025 16:25:28 -0700 Subject: [PATCH 17/20] revert test change --- .../apache/comet/CometArrayExpressionSuite.scala | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala index 668aa4189..5727f9f90 100644 --- a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala @@ -37,16 +37,12 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp val path = new Path(dir.toURI.toString, "test.parquet") makeParquetFileAllTypes(path, dictionaryEnabled, 10000) spark.read.parquet(path.toString).createOrReplaceTempView("t1") - withSQLConf( - CometConf.COMET_EXPLAIN_NATIVE_ENABLED.key -> "true", - CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key -> "true") { -// checkSparkAnswerAndOperator( -// sql("SELECT array_remove(array(_2, _3,_4), _2) from t1 where _2 is null")) -// checkSparkAnswerAndOperator( -// sql("SELECT array_remove(array(_2, _3,_4), _3) from t1 where _3 is not null")) - checkSparkAnswerAndOperator(sql( - "SELECT array_remove(case when _2 = _3 THEN array(_2, _3,_4) ELSE null END, _3) from t1")) - } + checkSparkAnswerAndOperator( + sql("SELECT array_remove(array(_2, _3,_4), _2) from t1 where _2 is null")) + checkSparkAnswerAndOperator( + sql("SELECT array_remove(array(_2, _3,_4), _3) from t1 where _3 is not null")) + checkSparkAnswerAndOperator(sql( + "SELECT array_remove(case when _2 = _3 THEN array(_2, _3,_4) ELSE null END, _3) from t1")) } } } From 453bf5b417bc3616411a88878f735f56a7ddc098 Mon Sep 17 00:00:00 2001 From: Andy Grove <agrove@apache.org> Date: Mon, 27 Jan 2025 16:33:11 -0700 Subject: [PATCH 18/20] clippy --- native/core/src/execution/planner.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index c92a3bdbc..75b64bbab 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -2559,12 +2559,12 @@ fn create_case_expr( ) -> Result<Arc<dyn PhysicalExpr>, ExecutionError> { let then_types: Vec<DataType> = when_then_pairs .iter() - .map(|x| x.1.data_type(&input_schema)) + .map(|x| x.1.data_type(input_schema)) .collect::<Result<Vec<_>, _>>()?; let else_type: Option<DataType> = else_expr .as_ref() - .map(|x| Arc::clone(x).data_type(&input_schema)) + .map(|x| Arc::clone(x).data_type(input_schema)) .transpose()? .or(Some(DataType::Null)); From 65acbb53fc2525ccf1c80c30dc16d9b6a5b08371 Mon Sep 17 00:00:00 2001 From: Andy Grove <agrove@apache.org> Date: Tue, 28 Jan 2025 10:25:57 -0700 Subject: [PATCH 19/20] revert whitespace change --- native/core/src/execution/jni_api.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index 8e324845a..fe29d8da1 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -473,7 +473,6 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan( let start = Instant::now(); let planner = PhysicalPlanner::new(Arc::clone(&exec_context.session_ctx)) .with_exec_id(exec_context_id); - let (scans, root_op) = planner.create_plan( &exec_context.spark_plan, &mut exec_context.input_sources.clone(), From 6f8d2fa97763f6c19bdfdd1de773e0885031c4c8 Mon Sep 17 00:00:00 2001 From: Andy Grove <agrove@apache.org> Date: Tue, 28 Jan 2025 10:35:41 -0700 Subject: [PATCH 20/20] Set rust-version to 1.81 --- native/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/native/Cargo.toml b/native/Cargo.toml index 2c10e1d86..29fb269db 100644 --- a/native/Cargo.toml +++ b/native/Cargo.toml @@ -30,7 +30,7 @@ license = "Apache-2.0" edition = "2021" # Comet uses the same minimum Rust version as DataFusion -rust-version = "1.79" +rust-version = "1.81" [workspace.dependencies] arrow = { version = "54.0.0", features = ["prettyprint", "ffi", "chrono-tz"] }