Skip to content

fix: SortMergeJoin for timestamp keys #1901

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open

Conversation

SKY-ALIN
Copy link

Which issue does this PR close?

Closes #1900.

Rationale for this change

This type is supported, but missed on the proto stage + message formatting is incorrect

How are these changes tested?

These changes are tested locally comparring results with Spark without and with COmet extension

@SKY-ALIN
Copy link
Author

SKY-ALIN commented Jun 18, 2025

It fixes formatting also, now it looks like this:

25/06/18 12:53:47 WARN CometExecRule: Comet cannot execute some parts of this plan natively (set spark.comet.explainFallback.enabled=false to disable this logging):
Project
+- BroadcastHashJoin
   :- Project
   :  +- Window
   :     +- Sort
   :        +-  Exchange [COMET: ]
   :           +- Project
   :              +-  SortMergeJoin [COMET: Unsupported join key type TimestampType on key CAST(time AS TIMESTAMP)]
...

but after adding TimestampType into match statement there is no this message anyway:), just for clearance

@SKY-ALIN SKY-ALIN changed the title Fix SortMergeJoin for timestamp keys fix: SortMergeJoin for timestamp keys Jun 18, 2025
@mbutrovich
Copy link
Contributor

mbutrovich commented Jun 18, 2025

Thanks for the contribution, @SKY-ALIN! Could we add a test case with timestamps as the join key?

@@ -2168,7 +2168,8 @@ object QueryPlanSerde extends Logging with CometExprShim {
*/
private def supportedSortMergeJoinEqualType(dataType: DataType): Boolean = dataType match {
case _: ByteType | _: ShortType | _: IntegerType | _: LongType | _: FloatType |
_: DoubleType | _: StringType | _: DateType | _: DecimalType | _: BooleanType =>
_: DoubleType | _: StringType | _: DateType | _: DecimalType | _: BooleanType |
_: TimestampType =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should TimestampNTZType also be supported?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TimestampNTZType is supported; there is another case 1 line below.

    case TimestampNTZType => true

@parthchandra
Copy link
Contributor

Thanks for the contribution, @SKY-ALIN! Could we add a test case with timestamps as the join key?

The test should have the left side and the right side timestamps be in different timezones.

Should TimestampNTZType also be supported?

If the above test case passes we probably can.

@codecov-commenter
Copy link

codecov-commenter commented Jun 19, 2025

Codecov Report

Attention: Patch coverage is 0% with 4 lines in your changes missing coverage. Please review.

Project coverage is 42.54%. Comparing base (f09f8af) to head (75d681d).
Report is 270 commits behind head on main.

Files with missing lines Patch % Lines
.../scala/org/apache/comet/serde/QueryPlanSerde.scala 0.00% 4 Missing ⚠️
Additional details and impacted files
@@              Coverage Diff              @@
##               main    #1901       +/-   ##
=============================================
- Coverage     56.12%   42.54%   -13.58%     
+ Complexity      976      938       -38     
=============================================
  Files           119      130       +11     
  Lines         11743    12828     +1085     
  Branches       2251     2414      +163     
=============================================
- Hits           6591     5458     -1133     
- Misses         4012     6283     +2271     
+ Partials       1140     1087       -53     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@SKY-ALIN
Copy link
Author

@mbutrovich done.

@@ -54,25 +54,6 @@ class CometJoinSuite extends CometTestBase {
.toSeq)
}

test("SortMergeJoin with unsupported key type should fall back to Spark") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test you have added is great. Thank you!

However this removed test exercises a few things that we should also check for - Timestamps read from Parquet, and the actual plan created. For the latter you can simply change the test to use checkSparkAnswerAndOperator and remove the check that the canonicalized plans are the same.

Also, to really make sure that we are testing timestamps, we really should have the left side and the right side of the join use timestamps with different timezones.

To create timestamps with different timezones, we can modify this test to create the test files separately -

  withSQLConf(
      SQLConf.SESSION_LOCAL_TIMEZONE.key -> "Asia/Kathmandu",
      SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
      withTable("t1", "t2") {
        withSQLconf( SQLConf.SESSION_LOCAL_TIMEZONE.key -> "Australia/Darwin") {
            sql("CREATE TABLE t1(name STRING, time TIMESTAMP) USING PARQUET")
            sql("INSERT OVERWRITE t1 VALUES('a', timestamp'2019-01-01 11:11:11')")
        }
        withSQLconf( SQLConf.SESSION_LOCAL_TIMEZONE.key -> "Canada/Pacific") {
            sql("CREATE TABLE t2(name STRING, time TIMESTAMP) USING PARQUET")
            sql("INSERT OVERWRITE t2 VALUES('a', timestamp'2019-01-01 11:11:11')")
        }
        ...

The join above with different timezones will return zero rows since the timezones are not the same.

Also, we could rename the test.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Such a test will not work as the current implementation of datafusion doesn't include this case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A DF join on timestamp columns does not take into account time zones? Then it wouldn't be correct, would it?

@parthchandra
Copy link
Contributor

I don't think this PR is making the correct change.
With this PR the removed test fails to execute the query (let alone pass the assertion)

  test("SortMergeJoin with unsupported key type should fall back to Spark") {
    withSQLConf(
      SQLConf.SESSION_LOCAL_TIMEZONE.key -> "Asia/Kathmandu",
      SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
      withTable("t1", "t2") {
        sql("CREATE TABLE t1(name STRING, time TIMESTAMP) USING PARQUET")
        sql("INSERT OVERWRITE t1 VALUES('a', timestamp'2019-01-01 11:11:11')")

        sql("CREATE TABLE t2(name STRING, time TIMESTAMP) USING PARQUET")
        sql("INSERT OVERWRITE t2 VALUES('a', timestamp'2019-01-01 11:11:11')")

        val df = sql("SELECT * FROM t1 JOIN t2 ON t1.time = t2.time")
        val (sparkPlan, cometPlan) = checkSparkAnswer(df)                // should NOT fail here, but does
        assert(sparkPlan.canonicalized === cometPlan.canonicalized)       // should fail here
      }
    }
  }

The reason for the failure is -

org.apache.comet.CometNativeException: Unsupported data type in sort merge join comparator: Timestamp(Microsecond, Some("UTC"))

This means that supportedSortMergeJoinEqualType should not return true for Timestamp

The newly added test does not exercise the change. The plan (see below) does not include a SMJ and the test passes with and without the changes in PR.

AdaptiveSparkPlan isFinalPlan=false
+- BroadcastHashJoin [ts#4], [ts#10], Inner, BuildRight, false
   :- LocalTableScan [ts#4]
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, timestamp, true]),false), [plan_id=106]
      +- LocalTableScan [ts#10]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

SortMergeJoin with timestamp fix
5 participants