@@ -47,7 +47,7 @@ object StateStoreColumnFamilySchemaUtils {
47
47
// Byte type is converted to Int in Avro, which doesn't work for us as Avro
48
48
// uses zig-zag encoding as opposed to big-endian for Ints
49
49
Seq (
50
- StructField (s " ${field.name}_marker " , BinaryType ),
50
+ StructField (s " ${field.name}_marker " , BinaryType , nullable = true ),
51
51
field.copy(name = s " ${field.name}_value " , BinaryType )
52
52
)
53
53
} else {
@@ -117,7 +117,7 @@ object StateStoreColumnFamilySchemaUtils {
117
117
getRowCounterCFName(stateName), keySchemaId = 0 ,
118
118
keyEncoder.schema,
119
119
valueSchemaId = 0 ,
120
- StructType (Seq (StructField (" count" , LongType ))),
120
+ StructType (Seq (StructField (" count" , LongType , nullable = true ))),
121
121
Some (NoPrefixKeyStateEncoderSpec (keyEncoder.schema)))
122
122
schemas.put(counterSchema.colFamilyName, counterSchema)
123
123
@@ -149,7 +149,7 @@ object StateStoreColumnFamilySchemaUtils {
149
149
keySchemaId = 0 ,
150
150
keyEncoder.schema,
151
151
valueSchemaId = 0 ,
152
- StructType (Seq (StructField (" count" , LongType ))),
152
+ StructType (Seq (StructField (" count" , LongType , nullable = true ))),
153
153
Some (NoPrefixKeyStateEncoderSpec (keyEncoder.schema)))
154
154
schemas.put(countSchema.colFamilyName, countSchema)
155
155
}
0 commit comments