-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-51065][SQL] Disallowing non-nullable schema when Avro encoding is used for TransformWithState #49751
base: master
Are you sure you want to change the base?
Conversation
...re/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala
Outdated
Show resolved
Hide resolved
@ericm-db - can u add the SPARK ticket in the PR title ? |
@ericm-db - also, is test failure related to the change ? |
...re/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala
Outdated
Show resolved
Hide resolved
@HeartSaVioR Can you PTAL when you get a chance? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First pass. I feel like I'm not fully understand the full picture of this, so need to get answers from my review comments.
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala
Outdated
Show resolved
Hide resolved
...re/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasExec.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
Outdated
Show resolved
Hide resolved
@@ -1470,6 +1470,39 @@ def check_exception(error): | |||
check_exception=check_exception, | |||
) | |||
|
|||
def test_not_nullable_fails(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not having identical test in Scala as well? I don't see a new test verifying the error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The thing is, there is no way for user to specify this using Scala.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes and probably also no.
I agree moderate users may not ever try to get over and just stick with case class or POJO or so. But "we" can imagine a way to get over, exactly the same way how we could support PySpark:
override protected val stateEncoder: ExpressionEncoder[Any] =
ExpressionEncoder(stateType).resolveAndBind().asInstanceOf[ExpressionEncoder[Any]]
This is how we come up with state encoder for Python version of FMGWS. This is to serde with Row interface - my rough memory says it's not InternalRow but Row, so, most likely work with GenericRow, but we can try with both GenericRow and InternalRow.
I'm OK with deferring this as follow-up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now I get that you are not able to test this actually, as we have to just accept non-nullable column and change to nullable. Again I doubt this is just a bug though.
...re/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala
Outdated
Show resolved
Hide resolved
@@ -47,7 +47,7 @@ object StateStoreColumnFamilySchemaUtils { | |||
// Byte type is converted to Int in Avro, which doesn't work for us as Avro | |||
// uses zig-zag encoding as opposed to big-endian for Ints | |||
Seq( | |||
StructField(s"${field.name}_marker", BinaryType, nullable = false), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets say nullable=true
explicitly ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see the divergence comes from the fact Encoder of case class gives non-nullable columns as schema definition, which I wonder whether this is really correct behavior.
I'd suggest to experiment with case class with explicitly giving null in the field, and see whether it is really safe or we will come to NPE. If it's latter, it's definitely a bug we need to fix. We can do this as a follow up, but maybe before Spark 4.0 release, as it is a bit weird to me.
@@ -149,8 +149,11 @@ case class TransformWithStateExec( | |||
0, keyExpressions.toStructType, 0, DUMMY_VALUE_ROW_SCHEMA, | |||
Some(NoPrefixKeyStateEncoderSpec(keySchema))) | |||
|
|||
// For Scala, the user can't explicitly set nullability on schema, so there is |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Likewise I mentioned in other comment, it is not impossible to set nullability on encoder (although I tend to agree most users won't). Let's not make this be conditional.
Also, this is concerning me - if we are very confident that users would never be able to set column to be nullable, why we need to change the schema as we all know it has to be nullable? What we are worrying about if we just do the same with Python?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#49751 (comment)
I realized you had to go through this way due to case class enconder. Sorry about that.
@@ -5072,6 +5072,14 @@ | |||
], | |||
"sqlState" : "42601" | |||
}, | |||
"TRANSFORM_WITH_STATE_SCHEMA_MUST_BE_NULLABLE" : { | |||
"message" : [ | |||
"If schema evolution is enabled, all the fields in the schema for column family <columnFamilyName> must be nullable", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we think whichever is easier to understand, "using Avro" or "schema evolution is enabled"?
I foresee the direction of using Avro for all stateful operators (unless there is outstanding regression), and once we make Avro by default, this will be confusing one to consume because they don't do anything for schema evolution. IMO it is "indirect" information and they would probably try to figure out how to disable schema evolution instead, without knowing that Avro and schema evolution is coupled.
cc. @anishshri-db to hear his voice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea I think its fine to say that we refer to the transformWithState
case relative to Avro
being used - dont need to explicitly call out schema evolution here
* true when using Python, as this is the only avenue through | ||
* which users can set nullability | ||
* @param shouldSetNullable Whether we need to set the fields as nullable. This is set to | ||
* true when using Scala, as case classes are set to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
case classes are set to non-nullable by default.
I'm actually surprised and it sounds like a bug to me. (Sorry, you had to handle Python and Scala differently due to this. My bad.)
What if you set null
to any of fields in case class? Will it work, and if it works, how?
If this is indeed a bug and we can fix that, then we can simplify things a lot. I'm OK if you want to defer this, but definitely need to have follow up ticket for this.
@@ -145,6 +145,12 @@ object StateStoreErrors { | |||
new StateStoreValueSchemaNotCompatible(storedValueSchema, newValueSchema) | |||
} | |||
|
|||
def twsSchemaMustBeNullable( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think TWS deserves its own error collection class, but I agree this is out of scope. Let's make a follow-up.
What changes were proposed in this pull request?
Right now, effectively set all fields in a schema to nullable, regardless of what the user specifies.
Why are the changes needed?
In order to keep parity with the user-specified schema with the actual schema that we use, and to enable the schema evolution use cases we want
Does this PR introduce any user-facing change?
This error is thrown if the schema is defined as non-nullable
How was this patch tested?
Unit tests
Was this patch authored or co-authored using generative AI tooling?
No