-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-48516][PYTHON][CONNECT] Turn on Arrow optimization for Python UDFs by default #49482
base: master
Are you sure you want to change the base?
Conversation
@@ -71,6 +71,15 @@ | |||
pass | |||
|
|||
|
|||
has_arrow: bool = False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can use from pyspark.testing.utils import have_pyarrow
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we address this comment @xinrong-meng
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point! Resolved, thank you
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There will be a circular import if we do that. Let me follow up with a separate PR instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that there are many unit test failures unfortunately, @xinrong-meng .
Feel free to have your time. I believe that the community agree that this is worthy for Apache Spark 4.0.0. We can backport this when this PR is ready.
Let's make CIs pass first.
Gentle ping, @xinrong-meng . If this is targeting Apache Spark 4.0, we had better have this before February 1st. |
Thanks @dongjoon-hyun I just got some free cycles for that and will resolve it ASAP. |
A quick update: the PR is blocked by UDT support in Arrow Python UDF, which I’m currently working on |
Thank you for the updated context. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given the status, I believe this contribution is proper for Apache Spark 4.1.0 because we need more testing and the community verification.
Thanks @dongjoon-hyun for attention! The current proposal is to fall back to the existing (non-Arrow-optimized) Python UDF when UDT is involved. My understanding is that no further testing is needed and the code change is minimal (just an if-else), but I respect the community’s decision. |
I marked it as WIP because I wanted to file a separate PR for the fallback mechanism with tests. Once that PR is in, this PR will be unblocked immediately. |
Let's make this ready ASAP for 4.0 |
…t and output types ### What changes were proposed in this pull request? Introduce a fallback mechanism for Arrow-optimized Python UDFs when either the input or return types contain User-Defined Types (UDTs). If UDTs are detected, the system logs a warning and switches to currently default, non-Arrow-optimized UDF. ### Why are the changes needed? To unblock enabling Arrow-optimized Python UDFs by default, see [pr](#49482) ### Does this PR introduce _any_ user-facing change? Yes. UDT input and output types will not fail Arrow Python UDF anymore, as shown below: ```py >>> import pyspark.sql.functions as F >>> from pyspark.sql import Row >>> from pyspark.testing.sqlutils import ExamplePoint, ExamplePointUDT # UDT intput >>> from pyspark.sql.types import * >>> row = Row(label=1.0, point=ExamplePoint(1.0, 2.0)) >>> df = spark.createDataFrame([row]) >>> >>> udf1 = F.udf(lambda p: p.y, DoubleType(), useArrow=True) >>> df.select(udf1(df.point)).show() 25/02/03 17:49:57 WARN ExtractPythonUDFs: Arrow optimization disabled due to UDT input or return type. Falling back to non-Arrow-optimized UDF execution. +---------------+ |<lambda>(point)| +---------------+ | 2.0| +---------------+ # UDT output >>> row = Row(value=3.0) >>> df = spark.createDataFrame([row]) >>> udf_with_udt_output = F.udf(lambda v: ExamplePoint(v, v + 1), ExamplePointUDT(), useArrow=True) >>> df.select(udf_with_udt_output(df.value)).show() 25/02/03 17:51:43 WARN ExtractPythonUDFs: Arrow optimization disabled due to UDT input or return type. Falling back to non-Arrow-optimized UDF execution. +---------------+ |<lambda>(value)| +---------------+ | (3.0, 4.0)| +---------------+ ``` ### How was this patch tested? Unit tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #49786 from xinrong-meng/udt_arrow_udf. Authored-by: Xinrong Meng <[email protected]> Signed-off-by: Xinrong Meng <[email protected]>
…t and output types ### What changes were proposed in this pull request? Introduce a fallback mechanism for Arrow-optimized Python UDFs when either the input or return types contain User-Defined Types (UDTs). If UDTs are detected, the system logs a warning and switches to currently default, non-Arrow-optimized UDF. ### Why are the changes needed? To unblock enabling Arrow-optimized Python UDFs by default, see [pr](#49482) ### Does this PR introduce _any_ user-facing change? Yes. UDT input and output types will not fail Arrow Python UDF anymore, as shown below: ```py >>> import pyspark.sql.functions as F >>> from pyspark.sql import Row >>> from pyspark.testing.sqlutils import ExamplePoint, ExamplePointUDT # UDT intput >>> from pyspark.sql.types import * >>> row = Row(label=1.0, point=ExamplePoint(1.0, 2.0)) >>> df = spark.createDataFrame([row]) >>> >>> udf1 = F.udf(lambda p: p.y, DoubleType(), useArrow=True) >>> df.select(udf1(df.point)).show() 25/02/03 17:49:57 WARN ExtractPythonUDFs: Arrow optimization disabled due to UDT input or return type. Falling back to non-Arrow-optimized UDF execution. +---------------+ |<lambda>(point)| +---------------+ | 2.0| +---------------+ # UDT output >>> row = Row(value=3.0) >>> df = spark.createDataFrame([row]) >>> udf_with_udt_output = F.udf(lambda v: ExamplePoint(v, v + 1), ExamplePointUDT(), useArrow=True) >>> df.select(udf_with_udt_output(df.value)).show() 25/02/03 17:51:43 WARN ExtractPythonUDFs: Arrow optimization disabled due to UDT input or return type. Falling back to non-Arrow-optimized UDF execution. +---------------+ |<lambda>(value)| +---------------+ | (3.0, 4.0)| +---------------+ ``` ### How was this patch tested? Unit tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #49786 from xinrong-meng/udt_arrow_udf. Authored-by: Xinrong Meng <[email protected]> Signed-off-by: Xinrong Meng <[email protected]> (cherry picked from commit ac2f3a4) Signed-off-by: Xinrong Meng <[email protected]>
00d4e03
to
e25b0eb
Compare
The Arrow fallback PR is in so this PR should be unblocked. I’ll keep an eye on testing and make it ready ASAP! Thank you @HyukjinKwon @dongjoon-hyun |
can we address #49482 (comment) @xinrong-meng ? |
ac09220
to
e25b0eb
Compare
Can you take a look at the test failure to make sure? I think those failures look related. |
…t and output types ### What changes were proposed in this pull request? Introduce a fallback mechanism for Arrow-optimized Python UDFs when either the input or return types contain User-Defined Types (UDTs). If UDTs are detected, the system logs a warning and switches to currently default, non-Arrow-optimized UDF. ### Why are the changes needed? To unblock enabling Arrow-optimized Python UDFs by default, see [pr](apache#49482) ### Does this PR introduce _any_ user-facing change? Yes. UDT input and output types will not fail Arrow Python UDF anymore, as shown below: ```py >>> import pyspark.sql.functions as F >>> from pyspark.sql import Row >>> from pyspark.testing.sqlutils import ExamplePoint, ExamplePointUDT # UDT intput >>> from pyspark.sql.types import * >>> row = Row(label=1.0, point=ExamplePoint(1.0, 2.0)) >>> df = spark.createDataFrame([row]) >>> >>> udf1 = F.udf(lambda p: p.y, DoubleType(), useArrow=True) >>> df.select(udf1(df.point)).show() 25/02/03 17:49:57 WARN ExtractPythonUDFs: Arrow optimization disabled due to UDT input or return type. Falling back to non-Arrow-optimized UDF execution. +---------------+ |<lambda>(point)| +---------------+ | 2.0| +---------------+ # UDT output >>> row = Row(value=3.0) >>> df = spark.createDataFrame([row]) >>> udf_with_udt_output = F.udf(lambda v: ExamplePoint(v, v + 1), ExamplePointUDT(), useArrow=True) >>> df.select(udf_with_udt_output(df.value)).show() 25/02/03 17:51:43 WARN ExtractPythonUDFs: Arrow optimization disabled due to UDT input or return type. Falling back to non-Arrow-optimized UDF execution. +---------------+ |<lambda>(value)| +---------------+ | (3.0, 4.0)| +---------------+ ``` ### How was this patch tested? Unit tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#49786 from xinrong-meng/udt_arrow_udf. Authored-by: Xinrong Meng <[email protected]> Signed-off-by: Xinrong Meng <[email protected]>
What changes were proposed in this pull request?
Turn on Arrow optimization for Python UDFs by default
Why are the changes needed?
Arrow optimization was introduced in 3.4.0. See SPARK-40307 for more context.
Arrow-optimized Python UDF is approximately 1.6 times faster than the original pickled Python UDF. More details can be found in this blog post.
In version 4.0.0, we propose enabling the optimization by default. If PyArrow is not installed, it will fall back to the original pickled Python UDF.
Does this PR introduce any user-facing change?
Yes
How was this patch tested?
Existing tests
Was this patch authored or co-authored using generative AI tooling?
No