From ade70748fba4f5115da6d75ea6995eebb87bb64c Mon Sep 17 00:00:00 2001 From: Eric Marnadi Date: Thu, 6 Feb 2025 13:14:32 -0800 Subject: [PATCH] explicitly setting nullable = true --- .../sql/execution/streaming/ListStateMetricsImpl.scala | 2 +- .../streaming/StateStoreColumnFamilySchemaUtils.scala | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateMetricsImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateMetricsImpl.scala index c9de2ad8e9f4f..66b6bd063ae6b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateMetricsImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateMetricsImpl.scala @@ -38,7 +38,7 @@ trait ListStateMetricsImpl { // We keep track of the count of entries in the list in a separate column family // to avoid scanning the entire list to get the count. private val counterCFValueSchema: StructType = - StructType(Seq(StructField("count", LongType))) + StructType(Seq(StructField("count", LongType, nullable = true))) private val counterCFProjection = UnsafeProjection.create(counterCFValueSchema) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateStoreColumnFamilySchemaUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateStoreColumnFamilySchemaUtils.scala index 93d7ab845827c..4401f8cedff6b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateStoreColumnFamilySchemaUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateStoreColumnFamilySchemaUtils.scala @@ -47,7 +47,7 @@ object StateStoreColumnFamilySchemaUtils { // Byte type is converted to Int in Avro, which doesn't work for us as Avro // uses zig-zag encoding as opposed to big-endian for Ints Seq( - StructField(s"${field.name}_marker", BinaryType), + StructField(s"${field.name}_marker", BinaryType, nullable = true), field.copy(name = s"${field.name}_value", BinaryType) ) } else { @@ -117,7 +117,7 @@ object StateStoreColumnFamilySchemaUtils { getRowCounterCFName(stateName), keySchemaId = 0, keyEncoder.schema, valueSchemaId = 0, - StructType(Seq(StructField("count", LongType))), + StructType(Seq(StructField("count", LongType, nullable = true))), Some(NoPrefixKeyStateEncoderSpec(keyEncoder.schema))) schemas.put(counterSchema.colFamilyName, counterSchema) @@ -149,7 +149,7 @@ object StateStoreColumnFamilySchemaUtils { keySchemaId = 0, keyEncoder.schema, valueSchemaId = 0, - StructType(Seq(StructField("count", LongType))), + StructType(Seq(StructField("count", LongType, nullable = true))), Some(NoPrefixKeyStateEncoderSpec(keyEncoder.schema))) schemas.put(countSchema.colFamilyName, countSchema) }