Skip to content

Commit d6ca11e

Browse files
jonathan-albrecht-ibmHeartSaVioR
authored andcommitted
[SPARK-51092][SS] Skip the v1 FlatMapGroupsWithState tests with timeout on big endian platforms
### What changes were proposed in this pull request? Skip the v1 FlatMapGroupsWithState tests with timeout on big endian platforms. ### Why are the changes needed? The timestampTimeoutAttribute of StateManagerImplV1 is declared as IntegerType instead of LongType which breaks serialization on big endian platforms. This can't be fixed because it would be a breaking schema change so skip the tests instead. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Tested with existing tests on amd64 (little endian) and s390x (big endian) Below is the test result from s390x: ``` - flatMapGroupsWithState - streaming with processing time timeout - state format version 1 !!! CANCELED !!! FlatMapGroupsWithStateSuite.this.isStateFormatSupported(FlatMapGroupsWithStateSuite.this.sqlConf.getConf[Int](org.apache.spark.sql.internal.SQLConf.FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION)) was false (FlatMapGroupsWithStateSuite.scala:471) -- - flatMapGroupsWithState - streaming with processing time timeout - state format version 2 -- - flatMapGroupsWithState - streaming with processing time timeout - state format version 1 (RocksDBStateStore) !!! CANCELED !!! FlatMapGroupsWithStateSuite.this.isStateFormatSupported(FlatMapGroupsWithStateSuite.this.sqlConf.getConf[Int](org.apache.spark.sql.internal.SQLConf.FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION)) was false (FlatMapGroupsWithStateSuite.scala:471) -- - flatMapGroupsWithState - streaming with processing time timeout - state format version 1 (RocksDBStateStore with changelog checkpointing) !!! CANCELED !!! FlatMapGroupsWithStateSuite.this.isStateFormatSupported(FlatMapGroupsWithStateSuite.this.sqlConf.getConf[Int](org.apache.spark.sql.internal.SQLConf.FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION)) was false (FlatMapGroupsWithStateSuite.scala:471) -- - flatMapGroupsWithState - streaming with processing time timeout - state format version 2 (RocksDBStateStore) -- - flatMapGroupsWithState - streaming with processing time timeout - state format version 2 (RocksDBStateStore with changelog checkpointing) -- - flatMapGroupsWithState - streaming w/ event time timeout + watermark - state format version 1 !!! CANCELED !!! FlatMapGroupsWithStateSuite.this.isStateFormatSupported(FlatMapGroupsWithStateSuite.this.sqlConf.getConf[Int](org.apache.spark.sql.internal.SQLConf.FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION)) was false (FlatMapGroupsWithStateSuite.scala:539) -- - flatMapGroupsWithState - streaming w/ event time timeout + watermark - state format version 2 -- - flatMapGroupsWithState - streaming w/ event time timeout + watermark - state format version 1 (RocksDBStateStore) !!! CANCELED !!! FlatMapGroupsWithStateSuite.this.isStateFormatSupported(FlatMapGroupsWithStateSuite.this.sqlConf.getConf[Int](org.apache.spark.sql.internal.SQLConf.FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION)) was false (FlatMapGroupsWithStateSuite.scala:539) -- - flatMapGroupsWithState - streaming w/ event time timeout + watermark - state format version 1 (RocksDBStateStore with changelog checkpointing) !!! CANCELED !!! FlatMapGroupsWithStateSuite.this.isStateFormatSupported(FlatMapGroupsWithStateSuite.this.sqlConf.getConf[Int](org.apache.spark.sql.internal.SQLConf.FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION)) was false (FlatMapGroupsWithStateSuite.scala:539) -- - flatMapGroupsWithState - streaming w/ event time timeout + watermark - state format version 2 (RocksDBStateStore) -- - flatMapGroupsWithState - streaming w/ event time timeout + watermark - state format version 2 (RocksDBStateStore with changelog checkpointing) -- - flatMapGroupsWithState, state ver 1 !!! CANCELED !!! java.nio.ByteOrder.nativeOrder().equals(java.nio.ByteOrder.LITTLE_ENDIAN) was false (StateDataSourceReadSuite.scala:802) -- - flatMapGroupsWithState, state ver 2 -- - flatMapGroupsWithState, state ver 1 !!! CANCELED !!! java.nio.ByteOrder.nativeOrder().equals(java.nio.ByteOrder.LITTLE_ENDIAN) was false (StateDataSourceReadSuite.scala:802) -- - flatMapGroupsWithState, state ver 2 -- - flatMapGroupsWithState, state ver 1 !!! CANCELED !!! java.nio.ByteOrder.nativeOrder().equals(java.nio.ByteOrder.LITTLE_ENDIAN) was false (StateDataSourceReadSuite.scala:802) -- - flatMapGroupsWithState, state ver 2 ``` ### Was this patch authored or co-authored using generative AI tooling? No Closes #49811 from jonathan-albrecht-ibm/master-endian-flatMapGroups. Authored-by: Jonathan Albrecht <[email protected]> Signed-off-by: Jungtaek Lim <[email protected]>
1 parent da1854e commit d6ca11e

File tree

2 files changed

+19
-0
lines changed

2 files changed

+19
-0
lines changed

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.apache.spark.sql.execution.datasources.v2.state
1818

1919
import java.io.{File, FileWriter}
20+
import java.nio.ByteOrder
2021

2122
import org.apache.hadoop.conf.Configuration
2223
import org.scalatest.Assertions
@@ -794,6 +795,11 @@ abstract class StateDataSourceReadSuite extends StateDataSourceTestBase with Ass
794795
}
795796

796797
test("flatMapGroupsWithState, state ver 1") {
798+
// Skip this test on big endian platforms because the timestampTimeoutAttribute of
799+
// StateManagerImplV1 is declared as IntegerType instead of LongType which breaks
800+
// serialization on big endian. This can't be fixed because it would be a breaking
801+
// schema change.
802+
assume(ByteOrder.nativeOrder().equals(ByteOrder.LITTLE_ENDIAN))
797803
testFlatMapGroupsWithState(1)
798804
}
799805

sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.sql.streaming
1919

2020
import java.io.File
21+
import java.nio.ByteOrder
2122
import java.sql.Timestamp
2223

2324
import org.apache.commons.io.FileUtils
@@ -458,7 +459,17 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest {
458459
checkAnswer(df, Seq(("a", 2), ("b", 1)).toDF())
459460
}
460461

462+
// Skip the v1 tests with timeout on big endian platforms because the
463+
// timestampTimeoutAttribute of StateManagerImplV1 is declared as IntegerType instead
464+
// of LongType which breaks serialization on big endian. This can't be fixed because it
465+
// would be a breaking schema change.
466+
def isStateFormatSupported(stateFormatVersion: Int): Boolean = {
467+
stateFormatVersion != 1 || ByteOrder.nativeOrder().equals(ByteOrder.LITTLE_ENDIAN)
468+
}
469+
461470
testWithAllStateVersions("flatMapGroupsWithState - streaming with processing time timeout") {
471+
assume(
472+
isStateFormatSupported(sqlConf.getConf(SQLConf.FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION)))
462473
// Function to maintain the count as state and set the proc. time timeout delay of 10 seconds.
463474
// It returns the count if changed, or -1 if the state was removed by timeout.
464475
val stateFunc = (key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
@@ -526,6 +537,8 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest {
526537
}
527538

528539
testWithAllStateVersions("flatMapGroupsWithState - streaming w/ event time timeout + watermark") {
540+
assume(
541+
isStateFormatSupported(sqlConf.getConf(SQLConf.FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION)))
529542
val inputData = MemoryStream[(String, Int)]
530543
val result =
531544
inputData.toDS()

0 commit comments

Comments
 (0)