From 98e423a5d2e716004c25980b82fae230f38124e7 Mon Sep 17 00:00:00 2001 From: Eric Marnadi Date: Fri, 31 Jan 2025 08:35:37 -0800 Subject: [PATCH 01/23] Disallowing non-nullable schema when Avro encoding is used for TransformWithState --- .../resources/error/error-conditions.json | 8 +++ .../test_pandas_transform_with_state.py | 50 +++++++++++++++++++ .../StatefulProcessorHandleImpl.scala | 17 +++++-- .../streaming/state/StateStoreErrors.scala | 15 ++++++ 4 files changed, 85 insertions(+), 5 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 3bac6638f7060..e1e39709438ef 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -4715,6 +4715,14 @@ ], "sqlState" : "42K06" }, + "STATE_STORE_SCHEMA_MUST_BE_NULLABLE" : { + "message" : [ + "If schema evolution is enabled, all the fields in the schema for column family must be nullable.", + "Please set the 'spark.sql.streaming.stateStore.encodingFormat' to 'UnsafeRow' or make the schema nullable.", + "Current schema: " + ], + "sqlState" : "XXKST" + }, "STATE_STORE_VALUE_SCHEMA_NOT_COMPATIBLE" : { "message" : [ "Provided value schema does not match existing state value schema.", diff --git a/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py b/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py index d554a0cb37d73..6e872f8bb0dc0 100644 --- a/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py +++ b/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py @@ -1469,7 +1469,37 @@ def check_exception(error): df, check_exception=check_exception, ) + def test_not_nullable_fails(self): + with self.sql_conf({"spark.sql.streaming.stateStore.encodingFormat": "avro"}): + with tempfile.TemporaryDirectory() as checkpoint_dir: + input_path = tempfile.mkdtemp() + self._prepare_test_resource1(input_path) + + df = self._build_test_df(input_path) + + def check_basic_state(batch_df, batch_id): + result = batch_df.collect()[0] + assert result.value["id"] == 0 # First ID from test data + assert result.value["name"] == "name-0" + def check_exception(error): + from pyspark.errors.exceptions.captured import StreamingQueryException + + if not isinstance(error, StreamingQueryException): + return False + + error_msg = str(error) + return ( + "[STATE_STORE_SCHEMA_MUST_BE_NULLABLE]" in error_msg + and "column family state must be nullable" in error_msg + ) + self._run_evolution_test( + BasicProcessorNotNullable(), + checkpoint_dir, + check_basic_state, + df, + check_exception=check_exception + ) class SimpleStatefulProcessorWithInitialState(StatefulProcessor): # this dict is the same as input initial state dataframe @@ -1892,6 +1922,26 @@ def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]: def close(self) -> None: pass +class BasicProcessorNotNullable(StatefulProcessor): + # Schema definitions + state_schema = StructType( + [StructField("id", IntegerType(), False), StructField("name", StringType(), False)] + ) + + def init(self, handle): + self.state = handle.getValueState("state", self.state_schema) + + def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]: + for pdf in rows: + pass + id_val = int(key[0]) + name = f"name-{id_val}" + self.state.update((id_val, name)) + yield pd.DataFrame({"id": [key[0]], "value": [{"id": id_val, "name": name}]}) + + def close(self) -> None: + pass + class AddFieldsProcessor(StatefulProcessor): state_schema = StructType( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala index 9c58118385bbd..1bef8ff4ce0d3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala @@ -364,13 +364,20 @@ class DriverStatefulProcessorHandleImpl(timeMode: TimeMode, keyExprEnc: Expressi } def getColumnFamilySchemas( - setNullableFields: Boolean + shouldFieldsBeNullable: Boolean ): Map[String, StateStoreColFamilySchema] = { val schemas = columnFamilySchemas.toMap - if (setNullableFields) { - schemas.map { case (colFamilyName, stateStoreColFamilySchema) => - colFamilyName -> stateStoreColFamilySchema.copy( - valueSchema = stateStoreColFamilySchema.valueSchema.toNullable + if (shouldFieldsBeNullable) { + schemas.map { case (colFamilyName, schema) => + // assert that each field is nullable if schema evolution is enabled + schema.valueSchema.fields.foreach { field => + if (!field.nullable) { + throw StateStoreErrors.stateStoreSchemaMustBeNullable( + schema.colFamilyName, schema.valueSchema.toString()) + } + } + colFamilyName -> schema.copy( + valueSchema = schema.valueSchema.toNullable ) } } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala index f2b875a9113f8..9dabd811c709d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala @@ -145,6 +145,12 @@ object StateStoreErrors { new StateStoreValueSchemaNotCompatible(storedValueSchema, newValueSchema) } + def stateStoreSchemaMustBeNullable( + columnFamilyName: String, + schema: String): StateStoreSchemaMustBeNullable = { + new StateStoreSchemaMustBeNullable(columnFamilyName, schema) + } + def stateStoreInvalidValueSchemaEvolution( oldValueSchema: String, newValueSchema: String): StateStoreInvalidValueSchemaEvolution = { @@ -346,6 +352,15 @@ class StateStoreValueSchemaNotCompatible( "storedValueSchema" -> storedValueSchema, "newValueSchema" -> newValueSchema)) +class StateStoreSchemaMustBeNullable( + columnFamilyName: String, + schema: String) + extends SparkUnsupportedOperationException( + errorClass = "STATE_STORE_SCHEMA_MUST_BE_NULLABLE", + messageParameters = Map( + "columnFamilyName" -> columnFamilyName, + "schema" -> schema)) + class StateStoreInvalidValueSchemaEvolution( oldValueSchema: String, newValueSchema: String) From df5359eb7ed39bfc31b5bae1e4054fd6d86d5681 Mon Sep 17 00:00:00 2001 From: Eric Marnadi Date: Fri, 31 Jan 2025 09:47:54 -0800 Subject: [PATCH 02/23] adding two booleans --- .../execution/python/TransformWithStateInPandasExec.scala | 4 +++- .../execution/streaming/StatefulProcessorHandleImpl.scala | 7 ++++--- .../sql/execution/streaming/TransformWithStateExec.scala | 2 +- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasExec.scala index 1cff218229b87..09919209c99ef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasExec.scala @@ -117,7 +117,9 @@ case class TransformWithStateInPandasExec( override def getColFamilySchemas( setNullableFields: Boolean ): Map[String, StateStoreColFamilySchema] = { - driverProcessorHandle.getColumnFamilySchemas(setNullableFields) + driverProcessorHandle.getColumnFamilySchemas( + setNullableFields = setNullableFields, + ensureNullableFields = true) } override def getStateVariableInfos(): Map[String, TransformWithStateVariableInfo] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala index 1bef8ff4ce0d3..2d394f3ec30c7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala @@ -364,14 +364,15 @@ class DriverStatefulProcessorHandleImpl(timeMode: TimeMode, keyExprEnc: Expressi } def getColumnFamilySchemas( - shouldFieldsBeNullable: Boolean + setNullableFields: Boolean = true, + ensureNullableFields: Boolean = false ): Map[String, StateStoreColFamilySchema] = { val schemas = columnFamilySchemas.toMap - if (shouldFieldsBeNullable) { + if (setNullableFields) { schemas.map { case (colFamilyName, schema) => // assert that each field is nullable if schema evolution is enabled schema.valueSchema.fields.foreach { field => - if (!field.nullable) { + if (!field.nullable && ensureNullableFields) { throw StateStoreErrors.stateStoreSchemaMustBeNullable( schema.colFamilyName, schema.valueSchema.toString()) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala index 6234dd3ada39f..72564cd6354b5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala @@ -150,7 +150,7 @@ case class TransformWithStateExec( Some(NoPrefixKeyStateEncoderSpec(keySchema))) val columnFamilySchemas = getDriverProcessorHandle() - .getColumnFamilySchemas(setNullableFields) ++ + .getColumnFamilySchemas(setNullableFields = setNullableFields) ++ Map(StateStore.DEFAULT_COL_FAMILY_NAME -> defaultSchema) closeProcessorHandle() columnFamilySchemas From 16106a618cc7c223b31360f0aadbd4c74f70b2a3 Mon Sep 17 00:00:00 2001 From: Eric Marnadi Date: Fri, 31 Jan 2025 13:01:52 -0800 Subject: [PATCH 03/23] feedback --- .../resources/error/error-conditions.json | 3 +-- .../TransformWithStateInPandasExec.scala | 4 +-- .../StatefulProcessorHandleImpl.scala | 27 +++++++------------ .../streaming/TransformWithStateExec.scala | 2 +- 4 files changed, 13 insertions(+), 23 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index e1e39709438ef..8dd31595b5833 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -4718,8 +4718,7 @@ "STATE_STORE_SCHEMA_MUST_BE_NULLABLE" : { "message" : [ "If schema evolution is enabled, all the fields in the schema for column family must be nullable.", - "Please set the 'spark.sql.streaming.stateStore.encodingFormat' to 'UnsafeRow' or make the schema nullable.", - "Current schema: " + "Please make the schema nullable. Current schema: " ], "sqlState" : "XXKST" }, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasExec.scala index 09919209c99ef..1595ac0e40a50 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasExec.scala @@ -117,9 +117,7 @@ case class TransformWithStateInPandasExec( override def getColFamilySchemas( setNullableFields: Boolean ): Map[String, StateStoreColFamilySchema] = { - driverProcessorHandle.getColumnFamilySchemas( - setNullableFields = setNullableFields, - ensureNullableFields = true) + driverProcessorHandle.getColumnFamilySchemas() } override def getStateVariableInfos(): Map[String, TransformWithStateVariableInfo] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala index 2d394f3ec30c7..3199e1ef6527a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala @@ -363,26 +363,19 @@ class DriverStatefulProcessorHandleImpl(timeMode: TimeMode, keyExprEnc: Expressi addTimerColFamily() } - def getColumnFamilySchemas( - setNullableFields: Boolean = true, - ensureNullableFields: Boolean = false - ): Map[String, StateStoreColFamilySchema] = { + def getColumnFamilySchemas(): Map[String, StateStoreColFamilySchema] = { val schemas = columnFamilySchemas.toMap - if (setNullableFields) { - schemas.map { case (colFamilyName, schema) => - // assert that each field is nullable if schema evolution is enabled - schema.valueSchema.fields.foreach { field => - if (!field.nullable && ensureNullableFields) { - throw StateStoreErrors.stateStoreSchemaMustBeNullable( - schema.colFamilyName, schema.valueSchema.toString()) - } + schemas.map { case (colFamilyName, schema) => + // assert that each field is nullable if schema evolution is enabled + schema.valueSchema.fields.foreach { field => + if (!field.nullable) { + throw StateStoreErrors.stateStoreSchemaMustBeNullable( + schema.colFamilyName, schema.valueSchema.toString()) } - colFamilyName -> schema.copy( - valueSchema = schema.valueSchema.toNullable - ) } - } else { - schemas + colFamilyName -> schema.copy( + valueSchema = schema.valueSchema.toNullable + ) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala index 72564cd6354b5..bbfdf8c3d895f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala @@ -150,7 +150,7 @@ case class TransformWithStateExec( Some(NoPrefixKeyStateEncoderSpec(keySchema))) val columnFamilySchemas = getDriverProcessorHandle() - .getColumnFamilySchemas(setNullableFields = setNullableFields) ++ + .getColumnFamilySchemas() ++ Map(StateStore.DEFAULT_COL_FAMILY_NAME -> defaultSchema) closeProcessorHandle() columnFamilySchemas From fbfa147a15b28f5508a5db55e320fd4e2ed319c4 Mon Sep 17 00:00:00 2001 From: Eric Marnadi Date: Fri, 31 Jan 2025 13:06:10 -0800 Subject: [PATCH 04/23] this should be it --- .../execution/python/TransformWithStateInPandasExec.scala | 2 +- .../execution/streaming/StatefulProcessorHandleImpl.scala | 6 ++++-- .../sql/execution/streaming/TransformWithStateExec.scala | 2 +- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasExec.scala index 1595ac0e40a50..c2fdc82fde330 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasExec.scala @@ -117,7 +117,7 @@ case class TransformWithStateInPandasExec( override def getColFamilySchemas( setNullableFields: Boolean ): Map[String, StateStoreColFamilySchema] = { - driverProcessorHandle.getColumnFamilySchemas() + driverProcessorHandle.getColumnFamilySchemas(true) } override def getStateVariableInfos(): Map[String, TransformWithStateVariableInfo] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala index 3199e1ef6527a..5b4bf3e3c1007 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala @@ -363,12 +363,14 @@ class DriverStatefulProcessorHandleImpl(timeMode: TimeMode, keyExprEnc: Expressi addTimerColFamily() } - def getColumnFamilySchemas(): Map[String, StateStoreColFamilySchema] = { + def getColumnFamilySchemas( + shouldCheckNullable: Boolean + ): Map[String, StateStoreColFamilySchema] = { val schemas = columnFamilySchemas.toMap schemas.map { case (colFamilyName, schema) => // assert that each field is nullable if schema evolution is enabled schema.valueSchema.fields.foreach { field => - if (!field.nullable) { + if (!field.nullable && shouldCheckNullable) { throw StateStoreErrors.stateStoreSchemaMustBeNullable( schema.colFamilyName, schema.valueSchema.toString()) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala index bbfdf8c3d895f..fd430d3b69d69 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala @@ -150,7 +150,7 @@ case class TransformWithStateExec( Some(NoPrefixKeyStateEncoderSpec(keySchema))) val columnFamilySchemas = getDriverProcessorHandle() - .getColumnFamilySchemas() ++ + .getColumnFamilySchemas(false) ++ Map(StateStore.DEFAULT_COL_FAMILY_NAME -> defaultSchema) closeProcessorHandle() columnFamilySchemas From 67861335feaa654b8c021da8ec31fce876168adb Mon Sep 17 00:00:00 2001 From: Eric Marnadi Date: Sun, 2 Feb 2025 20:09:57 -0800 Subject: [PATCH 05/23] adding check --- .../execution/streaming/StatefulProcessorHandleImpl.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala index 5b4bf3e3c1007..5ef0c0b76d93a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala @@ -363,6 +363,10 @@ class DriverStatefulProcessorHandleImpl(timeMode: TimeMode, keyExprEnc: Expressi addTimerColFamily() } + private def isInternal(columnFamilyName: String): Boolean = { + columnFamilyName.startsWith("_") || columnFamilyName.startsWith("$") + } + def getColumnFamilySchemas( shouldCheckNullable: Boolean ): Map[String, StateStoreColFamilySchema] = { @@ -370,7 +374,7 @@ class DriverStatefulProcessorHandleImpl(timeMode: TimeMode, keyExprEnc: Expressi schemas.map { case (colFamilyName, schema) => // assert that each field is nullable if schema evolution is enabled schema.valueSchema.fields.foreach { field => - if (!field.nullable && shouldCheckNullable) { + if (!field.nullable && shouldCheckNullable && !isInternal(colFamilyName)) { throw StateStoreErrors.stateStoreSchemaMustBeNullable( schema.colFamilyName, schema.valueSchema.toString()) } From 96b14c375a1c4b2d0eb0fb1f2e476ca80c8c69b5 Mon Sep 17 00:00:00 2001 From: Eric Marnadi Date: Sun, 2 Feb 2025 20:22:25 -0800 Subject: [PATCH 06/23] should be nullable --- .../spark/sql/streaming/TransformWithStateSuite.scala | 5 ++--- .../streaming/TransformWithValueStateTTLSuite.scala | 11 +++++------ 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala index a76e63df873e7..5652adb5c5e41 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala @@ -1815,7 +1815,6 @@ class TransformWithStateSuite extends StateStoreMetricsTest TransformWithStateSuiteUtils.NUM_SHUFFLE_PARTITIONS.toString) { withTempDir { checkpointDir => // When Avro is used, we want to set the StructFields to nullable - val shouldBeNullable = usingAvroEncoding() val metadataPathPostfix = "state/0/_stateSchema/default" val stateSchemaPath = new Path(checkpointDir.toString, s"$metadataPathPostfix") @@ -1826,7 +1825,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest val schema0 = StateStoreColFamilySchema( "countState", 0, keySchema, 0, - new StructType().add("value", LongType, nullable = shouldBeNullable), + new StructType().add("value", LongType, nullable = true), Some(NoPrefixKeyStateEncoderSpec(keySchema)), None ) @@ -1834,7 +1833,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest "listState", 0, keySchema, 0, new StructType() - .add("id", LongType, nullable = shouldBeNullable) + .add("id", LongType, nullable = true) .add("name", StringType), Some(NoPrefixKeyStateEncoderSpec(keySchema)), None diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithValueStateTTLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithValueStateTTLSuite.scala index 5d87227ffd58a..74b12e4dbea45 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithValueStateTTLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithValueStateTTLSuite.scala @@ -273,7 +273,6 @@ class TransformWithValueStateTTLSuite extends TransformWithStateTTLTest { ) { withTempDir { checkpointDir => // When Avro is used, we want to set the StructFields to nullable - val shouldBeNullable = usingAvroEncoding() val metadataPathPostfix = "state/0/_stateSchema/default" val stateSchemaPath = new Path(checkpointDir.toString, s"$metadataPathPostfix") val hadoopConf = spark.sessionState.newHadoopConf() @@ -317,7 +316,7 @@ class TransformWithValueStateTTLSuite extends TransformWithStateTTLTest { val schema2 = StateStoreColFamilySchema( "$count_listState", 0, keySchema, 0, - new StructType().add("count", LongType, nullable = shouldBeNullable), + new StructType().add("count", LongType, nullable = true), Some(NoPrefixKeyStateEncoderSpec(keySchema)), None ) @@ -325,7 +324,7 @@ class TransformWithValueStateTTLSuite extends TransformWithStateTTLTest { val schema3 = StateStoreColFamilySchema( "$rowCounter_listState", 0, keySchema, 0, - new StructType().add("count", LongType, nullable = shouldBeNullable), + new StructType().add("count", LongType, nullable = true), Some(NoPrefixKeyStateEncoderSpec(keySchema)), None ) @@ -409,7 +408,7 @@ class TransformWithValueStateTTLSuite extends TransformWithStateTTLTest { "valueStateTTL", 0, keySchema, 0, new StructType() - .add("value", new StructType().add("value", IntegerType, nullable = shouldBeNullable)) + .add("value", new StructType().add("value", IntegerType, nullable = true)) .add("ttlExpirationMs", LongType), Some(NoPrefixKeyStateEncoderSpec(keySchema)), None @@ -418,7 +417,7 @@ class TransformWithValueStateTTLSuite extends TransformWithStateTTLTest { val schema10 = StateStoreColFamilySchema( "valueState", 0, keySchema, 0, - new StructType().add("value", IntegerType, nullable = shouldBeNullable), + new StructType().add("value", IntegerType, nullable = true), Some(NoPrefixKeyStateEncoderSpec(keySchema)), None ) @@ -428,7 +427,7 @@ class TransformWithValueStateTTLSuite extends TransformWithStateTTLTest { keySchema, 0, new StructType() .add("value", new StructType() - .add("id", LongType, nullable = shouldBeNullable) + .add("id", LongType, nullable = true) .add("name", StringType)) .add("ttlExpirationMs", LongType), Some(NoPrefixKeyStateEncoderSpec(keySchema)), From 570820a38776fdfd2609a4c4adb4c9f1b10edf68 Mon Sep 17 00:00:00 2001 From: Eric Marnadi Date: Sun, 2 Feb 2025 20:24:00 -0800 Subject: [PATCH 07/23] should be nullable --- .../apache/spark/sql/streaming/TransformWithStateSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala index 5652adb5c5e41..fd863208b473a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala @@ -1856,7 +1856,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest val schema3 = StateStoreColFamilySchema( "$rowCounter_listState", 0, keySchema, 0, - new StructType().add("count", LongType, nullable = shouldBeNullable), + new StructType().add("count", LongType, nullable = true), Some(NoPrefixKeyStateEncoderSpec(keySchema)), None ) From 6697723af9e6a8c7372737faa88ff04160b2fba3 Mon Sep 17 00:00:00 2001 From: Eric Marnadi Date: Sun, 2 Feb 2025 20:49:53 -0800 Subject: [PATCH 08/23] error-conditions --- .../src/main/resources/error/error-conditions.json | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 8dd31595b5833..e2b80060c39f7 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -4669,6 +4669,13 @@ ], "sqlState" : "42K06" }, + "STATE_STORE_SCHEMA_MUST_BE_NULLABLE" : { + "message" : [ + "If schema evolution is enabled, all the fields in the schema for column family must be nullable.", + "Please make the schema nullable. Current schema: " + ], + "sqlState" : "XXKST" + }, "STATE_STORE_STATE_SCHEMA_FILES_THRESHOLD_EXCEEDED" : { "message" : [ "The number of state schema files exceeds the maximum number of state schema files for this query: .", @@ -4715,13 +4722,6 @@ ], "sqlState" : "42K06" }, - "STATE_STORE_SCHEMA_MUST_BE_NULLABLE" : { - "message" : [ - "If schema evolution is enabled, all the fields in the schema for column family must be nullable.", - "Please make the schema nullable. Current schema: " - ], - "sqlState" : "XXKST" - }, "STATE_STORE_VALUE_SCHEMA_NOT_COMPATIBLE" : { "message" : [ "Provided value schema does not match existing state value schema.", From 8897e58994df67c1bd74873c81b6d25d31b0c234 Mon Sep 17 00:00:00 2001 From: Eric Marnadi Date: Tue, 4 Feb 2025 13:32:43 -0800 Subject: [PATCH 09/23] undoing other changes --- .../sql/execution/streaming/ListStateMetricsImpl.scala | 2 +- .../execution/streaming/StatefulProcessorHandleImpl.scala | 6 +----- .../spark/sql/streaming/TransformWithStateSuite.scala | 4 ++-- 3 files changed, 4 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateMetricsImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateMetricsImpl.scala index d5ffcba85719d..c9de2ad8e9f4f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateMetricsImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateMetricsImpl.scala @@ -38,7 +38,7 @@ trait ListStateMetricsImpl { // We keep track of the count of entries in the list in a separate column family // to avoid scanning the entire list to get the count. private val counterCFValueSchema: StructType = - StructType(Seq(StructField("count", LongType, nullable = false))) + StructType(Seq(StructField("count", LongType))) private val counterCFProjection = UnsafeProjection.create(counterCFValueSchema) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala index 5ef0c0b76d93a..5b4bf3e3c1007 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala @@ -363,10 +363,6 @@ class DriverStatefulProcessorHandleImpl(timeMode: TimeMode, keyExprEnc: Expressi addTimerColFamily() } - private def isInternal(columnFamilyName: String): Boolean = { - columnFamilyName.startsWith("_") || columnFamilyName.startsWith("$") - } - def getColumnFamilySchemas( shouldCheckNullable: Boolean ): Map[String, StateStoreColFamilySchema] = { @@ -374,7 +370,7 @@ class DriverStatefulProcessorHandleImpl(timeMode: TimeMode, keyExprEnc: Expressi schemas.map { case (colFamilyName, schema) => // assert that each field is nullable if schema evolution is enabled schema.valueSchema.fields.foreach { field => - if (!field.nullable && shouldCheckNullable && !isInternal(colFamilyName)) { + if (!field.nullable && shouldCheckNullable) { throw StateStoreErrors.stateStoreSchemaMustBeNullable( schema.colFamilyName, schema.valueSchema.toString()) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala index fd863208b473a..1abc6d73b7b0f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala @@ -2208,9 +2208,9 @@ class TransformWithStateSuite extends StateStoreMetricsTest t.asInstanceOf[SparkUnsupportedOperationException], condition = "STATE_STORE_VALUE_SCHEMA_NOT_COMPATIBLE", parameters = Map( - "storedValueSchema" -> "StructType(StructField(value,LongType,false))", + "storedValueSchema" -> "StructType(StructField(value,LongType,true))", "newValueSchema" -> - ("StructType(StructField(value,StructType(StructField(value,LongType,false))," + + ("StructType(StructField(value,StructType(StructField(value,LongType,true))," + "true),StructField(ttlExpirationMs,LongType,true))") ) ) From 84b5342cddda9a29381111c906093b69d0025936 Mon Sep 17 00:00:00 2001 From: Eric Marnadi Date: Tue, 4 Feb 2025 13:33:27 -0800 Subject: [PATCH 10/23] lint --- .../sql/tests/pandas/test_pandas_transform_with_state.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py b/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py index 6e872f8bb0dc0..7877bc083a194 100644 --- a/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py +++ b/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py @@ -1469,6 +1469,7 @@ def check_exception(error): df, check_exception=check_exception, ) + def test_not_nullable_fails(self): with self.sql_conf({"spark.sql.streaming.stateStore.encodingFormat": "avro"}): with tempfile.TemporaryDirectory() as checkpoint_dir: @@ -1493,14 +1494,16 @@ def check_exception(error): "[STATE_STORE_SCHEMA_MUST_BE_NULLABLE]" in error_msg and "column family state must be nullable" in error_msg ) + self._run_evolution_test( BasicProcessorNotNullable(), checkpoint_dir, check_basic_state, df, - check_exception=check_exception + check_exception=check_exception, ) + class SimpleStatefulProcessorWithInitialState(StatefulProcessor): # this dict is the same as input initial state dataframe dict = {("0",): 789, ("3",): 987} @@ -1922,6 +1925,7 @@ def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]: def close(self) -> None: pass + class BasicProcessorNotNullable(StatefulProcessor): # Schema definitions state_schema = StructType( From ac28140fcc457ce07320974ee23c2c7f74cfa507 Mon Sep 17 00:00:00 2001 From: Eric Marnadi Date: Tue, 4 Feb 2025 14:55:27 -0800 Subject: [PATCH 11/23] setting nullable to true --- .../streaming/StateStoreColumnFamilySchemaUtils.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateStoreColumnFamilySchemaUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateStoreColumnFamilySchemaUtils.scala index 4aef5553d5fd6..93d7ab845827c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateStoreColumnFamilySchemaUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateStoreColumnFamilySchemaUtils.scala @@ -47,7 +47,7 @@ object StateStoreColumnFamilySchemaUtils { // Byte type is converted to Int in Avro, which doesn't work for us as Avro // uses zig-zag encoding as opposed to big-endian for Ints Seq( - StructField(s"${field.name}_marker", BinaryType, nullable = false), + StructField(s"${field.name}_marker", BinaryType), field.copy(name = s"${field.name}_value", BinaryType) ) } else { @@ -117,7 +117,7 @@ object StateStoreColumnFamilySchemaUtils { getRowCounterCFName(stateName), keySchemaId = 0, keyEncoder.schema, valueSchemaId = 0, - StructType(Seq(StructField("count", LongType, nullable = false))), + StructType(Seq(StructField("count", LongType))), Some(NoPrefixKeyStateEncoderSpec(keyEncoder.schema))) schemas.put(counterSchema.colFamilyName, counterSchema) @@ -149,7 +149,7 @@ object StateStoreColumnFamilySchemaUtils { keySchemaId = 0, keyEncoder.schema, valueSchemaId = 0, - StructType(Seq(StructField("count", LongType, nullable = false))), + StructType(Seq(StructField("count", LongType))), Some(NoPrefixKeyStateEncoderSpec(keyEncoder.schema))) schemas.put(countSchema.colFamilyName, countSchema) } From 409164f1201f5e96fa715ff90fca611d5b4b90e5 Mon Sep 17 00:00:00 2001 From: Eric Marnadi Date: Tue, 4 Feb 2025 14:57:09 -0800 Subject: [PATCH 12/23] setting more things to false --- .../sql/execution/streaming/StatefulProcessorHandleImpl.scala | 2 +- .../org/apache/spark/sql/execution/streaming/TTLState.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala index 5b4bf3e3c1007..805cc74483f37 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala @@ -552,7 +552,7 @@ class DriverStatefulProcessorHandleImpl(timeMode: TimeMode, keyExprEnc: Expressi elementKeySchema: StructType): StateStoreColFamilySchema = { val countIndexName = s"$$count_$stateName" val countValueSchema = StructType(Seq( - StructField("count", LongType, nullable = false) + StructField("count", LongType) )) StateStoreColFamilySchema( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TTLState.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TTLState.scala index b4449f99d6ba5..398088ab16978 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TTLState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TTLState.scala @@ -357,7 +357,7 @@ abstract class OneToManyTTLState( // Schema of the entry count index: elementKey -> count private val COUNT_INDEX = "$count_" + stateName private val COUNT_INDEX_VALUE_SCHEMA: StructType = - StructType(Seq(StructField("count", LongType, nullable = false))) + StructType(Seq(StructField("count", LongType))) private val countIndexValueProjector = UnsafeProjection.create(COUNT_INDEX_VALUE_SCHEMA) // Reused internal row that we use to create an UnsafeRow with the schema of From 77a9c3f72e286aebf3e06e7fe9d6487910259b91 Mon Sep 17 00:00:00 2001 From: Eric Marnadi Date: Thu, 6 Feb 2025 09:28:05 -0800 Subject: [PATCH 13/23] error stuff --- .../main/resources/error/error-conditions.json | 15 ++++++++------- .../pandas/test_pandas_transform_with_state.py | 2 +- .../streaming/StatefulProcessorHandleImpl.scala | 2 +- .../streaming/state/StateStoreErrors.scala | 10 +++++----- 4 files changed, 15 insertions(+), 14 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 4b946858fa1a4..a083c51c5f85d 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -4768,13 +4768,6 @@ ], "sqlState" : "42K06" }, - "STATE_STORE_SCHEMA_MUST_BE_NULLABLE" : { - "message" : [ - "If schema evolution is enabled, all the fields in the schema for column family must be nullable.", - "Please make the schema nullable. Current schema: " - ], - "sqlState" : "XXKST" - }, "STATE_STORE_STATE_SCHEMA_FILES_THRESHOLD_EXCEEDED" : { "message" : [ "The number of state schema files exceeds the maximum number of state schema files for this query: .", @@ -5079,6 +5072,14 @@ ], "sqlState" : "42601" }, + "TRANSFORM_WITH_STATE_SCHEMA_MUST_BE_NULLABLE" : { + "message" : [ + "If schema evolution is enabled, all the fields in the schema for column family must be nullable ", + "when using the TransformWithState operator.", + "Please make the schema nullable. Current schema: " + ], + "sqlState" : "XXKST" + }, "TRANSPOSE_EXCEED_ROW_LIMIT" : { "message" : [ "Number of rows exceeds the allowed limit of for TRANSPOSE. If this was intended, set to at least the current row count." diff --git a/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py b/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py index 7877bc083a194..a9a0bbb31d492 100644 --- a/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py +++ b/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py @@ -1491,7 +1491,7 @@ def check_exception(error): error_msg = str(error) return ( - "[STATE_STORE_SCHEMA_MUST_BE_NULLABLE]" in error_msg + "[TRANSFORM_WITH_STATE_SCHEMA_MUST_BE_NULLABLE]" in error_msg and "column family state must be nullable" in error_msg ) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala index 805cc74483f37..72f653b7aea57 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala @@ -371,7 +371,7 @@ class DriverStatefulProcessorHandleImpl(timeMode: TimeMode, keyExprEnc: Expressi // assert that each field is nullable if schema evolution is enabled schema.valueSchema.fields.foreach { field => if (!field.nullable && shouldCheckNullable) { - throw StateStoreErrors.stateStoreSchemaMustBeNullable( + throw StateStoreErrors.twsSchemaMustBeNullable( schema.colFamilyName, schema.valueSchema.toString()) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala index 9dabd811c709d..188306e82f688 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala @@ -145,10 +145,10 @@ object StateStoreErrors { new StateStoreValueSchemaNotCompatible(storedValueSchema, newValueSchema) } - def stateStoreSchemaMustBeNullable( + def twsSchemaMustBeNullable( columnFamilyName: String, - schema: String): StateStoreSchemaMustBeNullable = { - new StateStoreSchemaMustBeNullable(columnFamilyName, schema) + schema: String): TWSSchemaMustBeNullable = { + new TWSSchemaMustBeNullable(columnFamilyName, schema) } def stateStoreInvalidValueSchemaEvolution( @@ -352,11 +352,11 @@ class StateStoreValueSchemaNotCompatible( "storedValueSchema" -> storedValueSchema, "newValueSchema" -> newValueSchema)) -class StateStoreSchemaMustBeNullable( +class TWSSchemaMustBeNullable( columnFamilyName: String, schema: String) extends SparkUnsupportedOperationException( - errorClass = "STATE_STORE_SCHEMA_MUST_BE_NULLABLE", + errorClass = "TRANSFORM_WITH_STATE_SCHEMA_MUST_BE_NULLABLE", messageParameters = Map( "columnFamilyName" -> columnFamilyName, "schema" -> schema)) From 42affc07504cc40762bfefd8f1217280e81259f3 Mon Sep 17 00:00:00 2001 From: Eric Marnadi Date: Thu, 6 Feb 2025 11:45:30 -0800 Subject: [PATCH 14/23] not passing in boolean --- .../execution/python/TransformWithStateInPandasExec.scala | 8 ++++---- .../sql/execution/streaming/TransformWithStateExec.scala | 8 ++++---- .../streaming/TransformWithStateVariableUtils.scala | 4 ++-- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasExec.scala index c2fdc82fde330..887a5d48207a8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasExec.scala @@ -114,10 +114,10 @@ case class TransformWithStateInPandasExec( override def operatorStateMetadataVersion: Int = 2 - override def getColFamilySchemas( - setNullableFields: Boolean - ): Map[String, StateStoreColFamilySchema] = { - driverProcessorHandle.getColumnFamilySchemas(true) + override def getColFamilySchemas(): Map[String, StateStoreColFamilySchema] = { + // For Python, the user can explicitly set nullability on schema, so + // we need to throw an error if the schema is nullable + driverProcessorHandle.getColumnFamilySchemas(shouldCheckNullable = true) } override def getStateVariableInfos(): Map[String, TransformWithStateVariableInfo] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala index fd430d3b69d69..7a8499f2250f5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala @@ -139,9 +139,7 @@ case class TransformWithStateExec( * Fetching the columnFamilySchemas from the StatefulProcessorHandle * after init is called. */ - override def getColFamilySchemas( - setNullableFields: Boolean - ): Map[String, StateStoreColFamilySchema] = { + override def getColFamilySchemas(): Map[String, StateStoreColFamilySchema] = { val keySchema = keyExpressions.toStructType // we have to add the default column family schema because the RocksDBStateEncoder // expects this entry to be present in the stateSchemaProvider. @@ -149,8 +147,10 @@ case class TransformWithStateExec( 0, keyExpressions.toStructType, 0, DUMMY_VALUE_ROW_SCHEMA, Some(NoPrefixKeyStateEncoderSpec(keySchema))) + // For Scala, the user can't explicitly set nullability on schema, so there is + // no reason to throw an error, and we can simply set the schema to nullable. val columnFamilySchemas = getDriverProcessorHandle() - .getColumnFamilySchemas(false) ++ + .getColumnFamilySchemas(shouldCheckNullable = false) ++ Map(StateStore.DEFAULT_COL_FAMILY_NAME -> defaultSchema) closeProcessorHandle() columnFamilySchemas diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateVariableUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateVariableUtils.scala index f49a5cde63c03..8d604aa4cf731 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateVariableUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateVariableUtils.scala @@ -175,7 +175,7 @@ object TransformWithStateOperatorProperties extends Logging { */ trait TransformWithStateMetadataUtils extends Logging { - def getColFamilySchemas(setNullableFields: Boolean): Map[String, StateStoreColFamilySchema] + def getColFamilySchemas(): Map[String, StateStoreColFamilySchema] def getStateVariableInfos(): Map[String, TransformWithStateVariableInfo] @@ -209,7 +209,7 @@ trait TransformWithStateMetadataUtils extends Logging { stateStoreEncodingFormat: String = UnsafeRow.toString): List[StateSchemaValidationResult] = { assert(stateSchemaVersion >= 3) val usingAvro = stateStoreEncodingFormat == Avro.toString - val newSchemas = getColFamilySchemas(usingAvro) + val newSchemas = getColFamilySchemas() val stateSchemaDir = stateSchemaDirPath(info) val newStateSchemaFilePath = new Path(stateSchemaDir, s"${batchId}_${UUID.randomUUID().toString}") From 9b947cb7a0797c77b2d5517ba40a6ef05d0d2d34 Mon Sep 17 00:00:00 2001 From: Eric Marnadi Date: Thu, 6 Feb 2025 13:03:45 -0800 Subject: [PATCH 15/23] setting vars --- .../TransformWithStateInPandasExec.scala | 9 +++++-- .../StatefulProcessorHandleImpl.scala | 25 +++++++++++++++---- .../streaming/TransformWithStateExec.scala | 7 ++++-- .../TransformWithStateVariableUtils.scala | 6 +++-- 4 files changed, 36 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasExec.scala index 887a5d48207a8..c5a2a54c3f6a5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasExec.scala @@ -114,10 +114,15 @@ case class TransformWithStateInPandasExec( override def operatorStateMetadataVersion: Int = 2 - override def getColFamilySchemas(): Map[String, StateStoreColFamilySchema] = { + override def getColFamilySchemas( + shouldBeNullable: Boolean + ): Map[String, StateStoreColFamilySchema] = { // For Python, the user can explicitly set nullability on schema, so // we need to throw an error if the schema is nullable - driverProcessorHandle.getColumnFamilySchemas(shouldCheckNullable = true) + driverProcessorHandle.getColumnFamilySchemas( + shouldCheckNullable = shouldBeNullable, + shouldSetNullable = shouldBeNullable + ) } override def getStateVariableInfos(): Map[String, TransformWithStateVariableInfo] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala index 72f653b7aea57..a6a2a3612186a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala @@ -363,21 +363,36 @@ class DriverStatefulProcessorHandleImpl(timeMode: TimeMode, keyExprEnc: Expressi addTimerColFamily() } + /** + * This method returns all column family schemas, and checks and enforces nullability + * if need be. + * @param shouldCheckNullable Whether we need to check the nullability. This is set to + * true when using Python, as this is the only avenue through + * which users can set nullability + * @param shouldSetNullable Whether we need to set the fields as nullable. This is set to + * true when using Scala, as case classes are set to non-nullable + * by default. + * @return column family schemas used by this stateful processor. + */ def getColumnFamilySchemas( - shouldCheckNullable: Boolean + shouldCheckNullable: Boolean, + shouldSetNullable: Boolean ): Map[String, StateStoreColFamilySchema] = { val schemas = columnFamilySchemas.toMap schemas.map { case (colFamilyName, schema) => - // assert that each field is nullable if schema evolution is enabled schema.valueSchema.fields.foreach { field => if (!field.nullable && shouldCheckNullable) { throw StateStoreErrors.twsSchemaMustBeNullable( schema.colFamilyName, schema.valueSchema.toString()) } } - colFamilyName -> schema.copy( - valueSchema = schema.valueSchema.toNullable - ) + if (shouldSetNullable) { + colFamilyName -> schema.copy( + valueSchema = schema.valueSchema.toNullable + ) + } else { + colFamilyName -> schema + } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala index 7a8499f2250f5..6e0502e186597 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala @@ -139,7 +139,9 @@ case class TransformWithStateExec( * Fetching the columnFamilySchemas from the StatefulProcessorHandle * after init is called. */ - override def getColFamilySchemas(): Map[String, StateStoreColFamilySchema] = { + override def getColFamilySchemas( + shouldBeNullable: Boolean + ): Map[String, StateStoreColFamilySchema] = { val keySchema = keyExpressions.toStructType // we have to add the default column family schema because the RocksDBStateEncoder // expects this entry to be present in the stateSchemaProvider. @@ -150,7 +152,8 @@ case class TransformWithStateExec( // For Scala, the user can't explicitly set nullability on schema, so there is // no reason to throw an error, and we can simply set the schema to nullable. val columnFamilySchemas = getDriverProcessorHandle() - .getColumnFamilySchemas(shouldCheckNullable = false) ++ + .getColumnFamilySchemas( + shouldCheckNullable = false, shouldSetNullable = shouldBeNullable) ++ Map(StateStore.DEFAULT_COL_FAMILY_NAME -> defaultSchema) closeProcessorHandle() columnFamilySchemas diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateVariableUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateVariableUtils.scala index 8d604aa4cf731..021a6fa1ecbdc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateVariableUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateVariableUtils.scala @@ -175,7 +175,9 @@ object TransformWithStateOperatorProperties extends Logging { */ trait TransformWithStateMetadataUtils extends Logging { - def getColFamilySchemas(): Map[String, StateStoreColFamilySchema] + // This method will return the column family schemas, and check whether the fields in the + // schema are nullable. If Avro encoding is used, we want to enforce nullability + def getColFamilySchemas(shouldBeNullable: Boolean): Map[String, StateStoreColFamilySchema] def getStateVariableInfos(): Map[String, TransformWithStateVariableInfo] @@ -209,7 +211,7 @@ trait TransformWithStateMetadataUtils extends Logging { stateStoreEncodingFormat: String = UnsafeRow.toString): List[StateSchemaValidationResult] = { assert(stateSchemaVersion >= 3) val usingAvro = stateStoreEncodingFormat == Avro.toString - val newSchemas = getColFamilySchemas() + val newSchemas = getColFamilySchemas(usingAvro) val stateSchemaDir = stateSchemaDirPath(info) val newStateSchemaFilePath = new Path(stateSchemaDir, s"${batchId}_${UUID.randomUUID().toString}") From 5452b689ea1ffa9856b47a7080e720f9ba6510fa Mon Sep 17 00:00:00 2001 From: Eric Marnadi Date: Thu, 6 Feb 2025 13:12:01 -0800 Subject: [PATCH 16/23] comment --- .../execution/streaming/StatefulProcessorHandleImpl.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala index a6a2a3612186a..31aeef498e71d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala @@ -365,13 +365,13 @@ class DriverStatefulProcessorHandleImpl(timeMode: TimeMode, keyExprEnc: Expressi /** * This method returns all column family schemas, and checks and enforces nullability - * if need be. + * if need be. The nullability check and set is only set to true when Avro is enabled. * @param shouldCheckNullable Whether we need to check the nullability. This is set to * true when using Python, as this is the only avenue through * which users can set nullability * @param shouldSetNullable Whether we need to set the fields as nullable. This is set to - * true when using Scala, as case classes are set to non-nullable - * by default. + * true when using Scala, as case classes are set to + * non-nullable by default. * @return column family schemas used by this stateful processor. */ def getColumnFamilySchemas( From ade70748fba4f5115da6d75ea6995eebb87bb64c Mon Sep 17 00:00:00 2001 From: Eric Marnadi Date: Thu, 6 Feb 2025 13:14:32 -0800 Subject: [PATCH 17/23] explicitly setting nullable = true --- .../sql/execution/streaming/ListStateMetricsImpl.scala | 2 +- .../streaming/StateStoreColumnFamilySchemaUtils.scala | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateMetricsImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateMetricsImpl.scala index c9de2ad8e9f4f..66b6bd063ae6b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateMetricsImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateMetricsImpl.scala @@ -38,7 +38,7 @@ trait ListStateMetricsImpl { // We keep track of the count of entries in the list in a separate column family // to avoid scanning the entire list to get the count. private val counterCFValueSchema: StructType = - StructType(Seq(StructField("count", LongType))) + StructType(Seq(StructField("count", LongType, nullable = true))) private val counterCFProjection = UnsafeProjection.create(counterCFValueSchema) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateStoreColumnFamilySchemaUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateStoreColumnFamilySchemaUtils.scala index 93d7ab845827c..4401f8cedff6b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateStoreColumnFamilySchemaUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateStoreColumnFamilySchemaUtils.scala @@ -47,7 +47,7 @@ object StateStoreColumnFamilySchemaUtils { // Byte type is converted to Int in Avro, which doesn't work for us as Avro // uses zig-zag encoding as opposed to big-endian for Ints Seq( - StructField(s"${field.name}_marker", BinaryType), + StructField(s"${field.name}_marker", BinaryType, nullable = true), field.copy(name = s"${field.name}_value", BinaryType) ) } else { @@ -117,7 +117,7 @@ object StateStoreColumnFamilySchemaUtils { getRowCounterCFName(stateName), keySchemaId = 0, keyEncoder.schema, valueSchemaId = 0, - StructType(Seq(StructField("count", LongType))), + StructType(Seq(StructField("count", LongType, nullable = true))), Some(NoPrefixKeyStateEncoderSpec(keyEncoder.schema))) schemas.put(counterSchema.colFamilyName, counterSchema) @@ -149,7 +149,7 @@ object StateStoreColumnFamilySchemaUtils { keySchemaId = 0, keyEncoder.schema, valueSchemaId = 0, - StructType(Seq(StructField("count", LongType))), + StructType(Seq(StructField("count", LongType, nullable = true))), Some(NoPrefixKeyStateEncoderSpec(keyEncoder.schema))) schemas.put(countSchema.colFamilyName, countSchema) } From 4ef646825ba686602189bc0b3bd5613a64ef449f Mon Sep 17 00:00:00 2001 From: Eric Marnadi Date: Thu, 6 Feb 2025 15:55:02 -0800 Subject: [PATCH 18/23] spacing in error-conditions.json --- common/utils/src/main/resources/error/error-conditions.json | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index a083c51c5f85d..0a84805082a63 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -5074,9 +5074,9 @@ }, "TRANSFORM_WITH_STATE_SCHEMA_MUST_BE_NULLABLE" : { "message" : [ - "If schema evolution is enabled, all the fields in the schema for column family must be nullable ", - "when using the TransformWithState operator.", - "Please make the schema nullable. Current schema: " + "If schema evolution is enabled, all the fields in the schema for column family must be nullable", + " when using the TransformWithState operator.", + " Please make the schema nullable. Current schema: " ], "sqlState" : "XXKST" }, From ce8bb14a853b0424f6ecc1092b32fe4caf15078d Mon Sep 17 00:00:00 2001 From: Eric Marnadi Date: Thu, 6 Feb 2025 22:50:50 -0800 Subject: [PATCH 19/23] msg update --- common/utils/src/main/resources/error/error-conditions.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 0a84805082a63..dc248f064e8d4 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -5074,7 +5074,7 @@ }, "TRANSFORM_WITH_STATE_SCHEMA_MUST_BE_NULLABLE" : { "message" : [ - "If schema evolution is enabled, all the fields in the schema for column family must be nullable", + "If Avro encoding is enabled, all the fields in the schema for column family must be nullable", " when using the TransformWithState operator.", " Please make the schema nullable. Current schema: " ], From f0b3d8ca44e51f98abc25f4f43b450b46e61a2dd Mon Sep 17 00:00:00 2001 From: Eric Marnadi Date: Thu, 6 Feb 2025 22:59:36 -0800 Subject: [PATCH 20/23] comment update --- .../sql/execution/streaming/StatefulProcessorHandleImpl.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala index 31aeef498e71d..aeeffa5f544a4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala @@ -370,8 +370,8 @@ class DriverStatefulProcessorHandleImpl(timeMode: TimeMode, keyExprEnc: Expressi * true when using Python, as this is the only avenue through * which users can set nullability * @param shouldSetNullable Whether we need to set the fields as nullable. This is set to - * true when using Scala, as case classes are set to - * non-nullable by default. + * true when using Scala, as primitive type encoders set the field + * to non-nullable by default * @return column family schemas used by this stateful processor. */ def getColumnFamilySchemas( From 2f33bb4619adecf4bfdfdb9af5e274a2b222cced Mon Sep 17 00:00:00 2001 From: Eric Marnadi Date: Thu, 6 Feb 2025 23:02:14 -0800 Subject: [PATCH 21/23] more comments --- .../sql/execution/streaming/StatefulProcessorHandleImpl.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala index aeeffa5f544a4..f1f0ddf206c60 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala @@ -371,7 +371,9 @@ class DriverStatefulProcessorHandleImpl(timeMode: TimeMode, keyExprEnc: Expressi * which users can set nullability * @param shouldSetNullable Whether we need to set the fields as nullable. This is set to * true when using Scala, as primitive type encoders set the field - * to non-nullable by default + * to non-nullable. Changing fields from non-nullable to nullable + * does not break anything (and is required for Avro encoding), so + * we can safely make this change. * @return column family schemas used by this stateful processor. */ def getColumnFamilySchemas( From a28aba8630b970650a05bd71adca17df6fc297d3 Mon Sep 17 00:00:00 2001 From: Eric Marnadi Date: Fri, 7 Feb 2025 09:14:44 -0800 Subject: [PATCH 22/23] fixing stuff --- common/utils/src/main/resources/error/error-conditions.json | 4 ++-- .../apache/spark/sql/streaming/TransformWithStateSuite.scala | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index dc248f064e8d4..6fbdf45722da4 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -5075,8 +5075,8 @@ "TRANSFORM_WITH_STATE_SCHEMA_MUST_BE_NULLABLE" : { "message" : [ "If Avro encoding is enabled, all the fields in the schema for column family must be nullable", - " when using the TransformWithState operator.", - " Please make the schema nullable. Current schema: " + "when using the TransformWithState operator.", + "Please make the schema nullable. Current schema: " ], "sqlState" : "XXKST" }, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala index 6c5f3aa4ec7d1..e98e47cae427d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala @@ -1406,7 +1406,7 @@ abstract class TransformWithStateSuite extends StateStoreMetricsTest val schema3 = StateStoreColFamilySchema( "$rowCounter_listState", 0, keySchema, 0, - new StructType().add("count", LongType, nullable = shouldBeNullable), + new StructType().add("count", LongType, nullable = true), Some(NoPrefixKeyStateEncoderSpec(keySchema)), None ) From d628b8d6cf6a6ee21ae6309d009de91d954f1557 Mon Sep 17 00:00:00 2001 From: Eric Marnadi Date: Fri, 7 Feb 2025 12:14:40 -0800 Subject: [PATCH 23/23] fixing test --- .../sql/streaming/TransformWithValueStateTTLSuite.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithValueStateTTLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithValueStateTTLSuite.scala index 634a1611e234f..4c682b18eef8b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithValueStateTTLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithValueStateTTLSuite.scala @@ -273,6 +273,7 @@ class TransformWithValueStateTTLSuite extends TransformWithStateTTLTest { ) { withTempDir { checkpointDir => // When Avro is used, we want to set the StructFields to nullable + val shouldBeNullable = usingAvroEncoding() val metadataPathPostfix = "state/0/_stateSchema/default" val stateSchemaPath = new Path(checkpointDir.toString, s"$metadataPathPostfix") val hadoopConf = spark.sessionState.newHadoopConf() @@ -408,7 +409,7 @@ class TransformWithValueStateTTLSuite extends TransformWithStateTTLTest { "valueStateTTL", 0, keySchema, 0, new StructType() - .add("value", new StructType().add("value", IntegerType, nullable = true)) + .add("value", new StructType().add("value", IntegerType, nullable = shouldBeNullable)) .add("ttlExpirationMs", LongType), Some(NoPrefixKeyStateEncoderSpec(keySchema)), None @@ -417,7 +418,7 @@ class TransformWithValueStateTTLSuite extends TransformWithStateTTLTest { val schema10 = StateStoreColFamilySchema( "valueState", 0, keySchema, 0, - new StructType().add("value", IntegerType, nullable = true), + new StructType().add("value", IntegerType, nullable = shouldBeNullable), Some(NoPrefixKeyStateEncoderSpec(keySchema)), None ) @@ -427,7 +428,7 @@ class TransformWithValueStateTTLSuite extends TransformWithStateTTLTest { keySchema, 0, new StructType() .add("value", new StructType() - .add("id", LongType, nullable = true) + .add("id", LongType, nullable = shouldBeNullable) .add("name", StringType)) .add("ttlExpirationMs", LongType), Some(NoPrefixKeyStateEncoderSpec(keySchema)),