-
Notifications
You must be signed in to change notification settings - Fork 176
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feat: Support array_intersect function #1271
Conversation
makeParquetFileAllTypes(path, dictionaryEnabled, 10000) | ||
spark.read.parquet(path.toString).createOrReplaceTempView("t1") | ||
checkSparkAnswerAndOperator( | ||
sql("SELECT array_intersect(array(_2, _3, _4), array(_9, _10)) from t1")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It isn't obvious to me whether any of these arrays actually intersect. Perhaps you could add one that is guaranteed to intersect such as array_intersect(array(_2, _3, _4), array(_3, _4))
or does Spark optimize that out?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review. Updated unit test case. Spark and Comet Physical Plans are as follows:
Spark Physical Plan:
*(1) Project [array_intersect(array(cast(_2#1 as int), cast(_3#2 as int), _4#3), array(cast(_3#2 as int), _4#3)) AS array_intersect(array(_2, _3, _4), array(_3, _4))#44]
+- *(1) ColumnarToRow
+- FileScan parquet [_2#1,_3#2,_4#3] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/jq/jhn012m16zzg7dc9lcgbdvjc0000gp/T/spark-97..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<_2:tinyint,_3:smallint,_4:int>
Comet Physical Plan:
*(1) CometColumnarToRow
+- CometProject [array_intersect(array(_2, _3, _4), array(_3, _4))#49], [array_intersect(array(cast(_2#1 as int), cast(_3#2 as int), _4#3), array(cast(_3#2 as int), _4#3)) AS array_intersect(array(_2, _3, _4), array(_3, _4))#49]
+- CometScan parquet [_2#1,_3#2,_4#3] Batched: true, DataFilters: [], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/jq/jhn012m16zzg7dc9lcgbdvjc0000gp/T/spark-97..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<_2:tinyint,_3:smallint,_4:int>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you intend to add a commit that updates the unit test? I don't see any changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, i have just pushed it. Thanks for the letting me know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @erenavsarogullari. It would be good to also have tests for null and empty arrays and also for other data types such as strings, but I think we can handle that as part of #1269 since this applies to all of the recently added array functions.
Sure @andygrove. I can also work on #1269 by assigning myself and cover |
Thanks @erenavsarogullari. It would be great to have help with this. I will try and add some more notes to the issue with suggestions for how we can improve coverage. |
Thanks for #1308. We will need to apply same approach to other array functions after #1308 is merged as part of #1269. I think our scope is here to test all supported types per array function and catch violations after passing |
I agree. We are hoping to merge the comet-parquet-exec branch into main today or tomorrow, and once that is done I will go ahead and start merging the current array function PRs and then we can work on the testing. |
Which issue does this PR close?
Related to Epic: #1042
array_intersect:
select array_intersect(array(1, 2, 3), array(2, 3, 4))
=>array(2, 3)
Rationale for this change
Defined under Epic: #1042
What changes are included in this PR?
planner.rs:
Created DataFusionarray_intersect
physical expression from Spark physical expression,expr.proto:
array_intersect
message has been added,QueryPlanSerde.scala:
array_intersect
pattern matching case has been added,CometExpressionSuite.scala:
A new UT has been added for array_intersect function.How are these changes tested?
A new UT has been added.