-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-33863][PYTHON] Respect session timezone in udf workers #53161
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
[SPARK-33863][PYTHON] Respect session timezone in udf workers #53161
Conversation
|
@cloud-fan , @ueshin , @zhengruifeng we've discussed this but did not reach to a conclusion. I had a draft here and a few questions. We probably need to further discuss about the implementation and implication. |
zhengruifeng
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is a behavior change, I think we need a flag for it.
Also we need new tests in test_udf.py
|
|
||
| # Use the local timezone to convert the timestamp | ||
| tz = datetime.datetime.now().astimezone().tzinfo | ||
| tzname = os.environ.get("SPARK_SESSION_LOCAL_TIMEZONE", None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To confirm, we will hit this branch for every udf execution, not just once per python worker initialization, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's correct, but it doesn't seem like spark.session.conf.set("spark.sql.session.timeZone") impacts the result. This only works when I create the session with the conf. Any ideas? I can investigate if that's an issue or we want to understand it. I just thought you might understand immediately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gaogaotiantian We can use the same way as the other configs to get the runtime config, like hideTraceback or simplifiedTraceback above. Please take a look at PythonRunner and its subclasses.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, so basically overwrite this for every subclassed worker?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Also if we have a flag, the subclasses should decide whether it returns the session local timezone or None.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the flag should be a conf in the same level as session local timezone? Or just Python udf level? Will it be default to the original behavior or the new behavior?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the flag should be the same level as the session local timezone, a runtime conf in SQLConf.
It can be enabled by default, but when disabled, the behavior should be the original behavior.
WDYT? cc @zhengruifeng @HyukjinKwon
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Arrow-based UDFs already handles the session local timezone, so it may be ok to just update BasePythonUDFRunner.envVars to have the env var there instead of PythonRunner?
| envVars.put("SPARK_SIMPLIFIED_TRACEBACK", "1") | ||
| } | ||
| if (sessionLocalTimeZone.isDefined) { | ||
| envVars.put("SPARK_SESSION_LOCAL_TIMEZONE", sessionLocalTimeZone.get) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for arrow-based UDFs, sessionLocalTimeZone is actually already passed to the python side
spark/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
Line 153 in ed23cc3
| val timeZoneConf = Seq(SQLConf.SESSION_LOCAL_TIMEZONE.key -> conf.sessionLocalTimeZone) |
However this workerConf is not available in vanilla Python UDF, probably we can consider supporting it in vanilla Python UDF in the future. also cc @HeartSaVioR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea it's better to pass the configs via a proper protocol, instead of system variables. But it's already the case for vanilla python runner and I think it's fine to follow it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree it's ideal, but I guess it'd need a non-trivial code change.
|
can we add a test which sets |
|
Okay now I rethink about this and I think our The key point of It's not possible to determine which timezone to use, when we get a naive datetime and try to convert it to a timezone aware data type. The only way I think we can go is to enforce timezone for If we can get the current session timezone when we try to convert to I don't want to fix this so that it's correct in some cases but become wrong in others. We should make a decision about whether we want to fix it (with the risk of breaking users code). If not, that's okay - we can have a timestamp system that sometimes work. |
Where do we do the conversion? At lease for the UDF case the conversion should all happen within an active query which belongs to a session ? |
We have to do it everywhere. df = spark.createDataFrame([(datetime.datetime(1990, 8, 10, 0, 0),)], ["ts"])Here we are trying to create a There are two correct ways to do this:
|
|
@gaogaotiantian The key of Spark For the case of A similar example is reading JDBC table that contains column with standard TIMESTAMP WITH TIMEZONE type. Each value can have a different timezone but it's still OK to read it as Spark Under the hood, |
|
@cloud-fan , I understand spark.conf.set("spark.sql.session.timeZone", "UTC")
df = spark.createDataFrame([
(datetime.datetime(1990, 8, 10, 0, 0, tzinfo=datetime.timezone.utc),),
(datetime.datetime(1990, 8, 10, 0, 0),)
], ["ts"])
df.show()The two columns above are different - because we do not respect session timezone when converting them. Notice that UDF is not involved at this point. spark/python/pyspark/sql/types.py Line 441 in 0908ec5
We don't check timezone info in We can fix that with some hacks, if we really want to - that's the option 1 I mentioned above. Again, we need to do it everywhere. However, that is not the full picture. We have an even worse issue about @udf(returnType=BooleanType())
def greater(ts):
return ts > datetime.datetime(1990, 8, 10, 0, 0, tzinfo=datetime.timezone.utc)The code above will raise an error, because we convert the Also I found a issue with probably DST. @udf(returnType=BooleanType())
def same(ts):
return ts == datetime.datetime(1990, 8, 10, 0, 0)
df = spark.createDataFrame([
(datetime.datetime(1990, 8, 10, 0, 0),)
], ["ts"])
df.select(same("ts")).show()Even this returns Back to my point - our This is why I propose my second option - it's a bit aggressive but we can make it right - to always map However, there is a risk that some of the existing user code can break. If that's a concern. We can just leave this broken. It still works in some occasions. |
|
ah this is tough. I agree with "always map TimestampType with aware datetime", but it can be a breaking change to python UDFs, as it's not only a data change, but also type change (It's illegal to compare a naive timestamp with an aware timestamp in Python). How about arrow/pandas? Do they also rely on datetime object? |
|
Yeah this could be a breaking change, but this is the correct way to go. Mapping I don't have the best knowledge of pandas, but it seems like they have similar concerns - https://pandas.pydata.org/docs/reference/api/pandas.to_datetime.html I mean we can't really make it work properly if we mix them up. I can think of a few ways to make it less painful
I agree this could be interruptive, but we can't make it right - that's the problem. It's a whole big mess internally and we simply can't make it better while keeping backward compatibility. |
What changes were proposed in this pull request?
Respect
spark.sql.session.timeZonein UDF workers.This is discussed in #52980 but we decided to move it to a separate PR. There are still open questions left
spark.conf.set. I believe this is trivial to people who are familiar with the configs so I did not investigate too much.Why are the changes needed?
Relying on the timezone of local machine does not make any sense.
Does this PR introduce any user-facing change?
Yes. The UDF behavior regarding to timestamps and timezones will be changed.
How was this patch tested?
Manually
Was this patch authored or co-authored using generative AI tooling?
No