diff --git a/python/docs/source/user_guide/sql/arrow_pandas.rst b/python/docs/source/user_guide/sql/arrow_pandas.rst index fde40140110f9..b9e389f8fe7dd 100644 --- a/python/docs/source/user_guide/sql/arrow_pandas.rst +++ b/python/docs/source/user_guide/sql/arrow_pandas.rst @@ -356,8 +356,8 @@ Arrow Python UDFs are user defined functions that are executed row-by-row, utili transfer and serialization. To define an Arrow Python UDF, you can use the :meth:`udf` decorator or wrap the function with the :meth:`udf` method, ensuring the ``useArrow`` parameter is set to True. Additionally, you can enable Arrow optimization for Python UDFs throughout the entire SparkSession by setting the Spark configuration -``spark.sql.execution.pythonUDF.arrow.enabled`` to true. It's important to note that the Spark configuration takes -effect only when ``useArrow`` is either not set or set to None. +``spark.sql.execution.pythonUDF.arrow.enabled`` to true, which is the default. It's important to note that the Spark +configuration takes effect only when ``useArrow`` is either not set or set to None. The type hints for Arrow Python UDFs should be specified in the same way as for default, pickled Python UDFs. diff --git a/python/docs/source/user_guide/sql/type_conversions.rst b/python/docs/source/user_guide/sql/type_conversions.rst index 2f13701995ef2..80f8aa83db7eb 100644 --- a/python/docs/source/user_guide/sql/type_conversions.rst +++ b/python/docs/source/user_guide/sql/type_conversions.rst @@ -57,7 +57,7 @@ are listed below: - Default * - spark.sql.execution.pythonUDF.arrow.enabled - Enable PyArrow in PySpark. See more `here `_. - - False + - True * - spark.sql.pyspark.inferNestedDictAsStruct.enabled - When enabled, nested dictionaries are inferred as StructType. Otherwise, they are inferred as MapType. - False diff --git a/python/pyspark/ml/base.py b/python/pyspark/ml/base.py index 224ef34fd5edc..0bdfa27fc7021 100644 --- a/python/pyspark/ml/base.py +++ b/python/pyspark/ml/base.py @@ -328,7 +328,8 @@ def transformSchema(self, schema: StructType) -> StructType: def _transform(self, dataset: DataFrame) -> DataFrame: self.transformSchema(dataset.schema) - transformUDF = udf(self.createTransformFunc(), self.outputDataType()) + # TODO(SPARK-48515): Use Arrow Python UDF + transformUDF = udf(self.createTransformFunc(), self.outputDataType(), useArrow=False) transformedDataset = dataset.withColumn( self.getOutputCol(), transformUDF(dataset[self.getInputCol()]) ) diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index d228157def4b2..a042344339834 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -3834,12 +3834,13 @@ def __init__(self, models: List[ClassificationModel]): ) def _transform(self, dataset: DataFrame) -> DataFrame: + # TODO(SPARK-48515): Use Arrow Python UDF # determine the input columns: these need to be passed through origCols = dataset.columns # add an accumulator column to store predictions of all the models accColName = "mbc$acc" + str(uuid.uuid4()) - initUDF = udf(lambda _: [], ArrayType(DoubleType())) + initUDF = udf(lambda _: [], ArrayType(DoubleType()), useArrow=False) newDataset = dataset.withColumn(accColName, initUDF(dataset[origCols[0]])) # persist if underlying dataset is not persistent. @@ -3859,6 +3860,7 @@ def _transform(self, dataset: DataFrame) -> DataFrame: updateUDF = udf( lambda predictions, prediction: predictions + [prediction.tolist()[1]], ArrayType(DoubleType()), + useArrow=False, ) transformedDataset = model.transform(aggregatedDataset).select(*columns) updatedDataset = transformedDataset.withColumn( @@ -3883,7 +3885,7 @@ def func(predictions: Iterable[float]) -> Vector: predArray.append(x) return Vectors.dense(predArray) - rawPredictionUDF = udf(func, VectorUDT()) + rawPredictionUDF = udf(func, VectorUDT(), useArrow=False) aggregatedDataset = aggregatedDataset.withColumn( self.getRawPredictionCol(), rawPredictionUDF(aggregatedDataset[accColName]) ) @@ -3895,6 +3897,7 @@ def func(predictions: Iterable[float]) -> Vector: max(enumerate(predictions), key=operator.itemgetter(1))[0] ), DoubleType(), + useArrow=False, ) aggregatedDataset = aggregatedDataset.withColumn( self.getPredictionCol(), labelUDF(aggregatedDataset[accColName]) diff --git a/python/pyspark/ml/tuning.py b/python/pyspark/ml/tuning.py index d3506bf1c6b07..c85f8438079f0 100644 --- a/python/pyspark/ml/tuning.py +++ b/python/pyspark/ml/tuning.py @@ -906,8 +906,12 @@ def checker(foldNum: int) -> bool: from pyspark.sql.connect.udf import UserDefinedFunction else: from pyspark.sql.functions import UserDefinedFunction # type: ignore[assignment] + from pyspark.util import PythonEvalType - checker_udf = UserDefinedFunction(checker, BooleanType()) + # TODO(SPARK-48515): Use Arrow Python UDF + checker_udf = UserDefinedFunction( + checker, BooleanType(), evalType=PythonEvalType.SQL_BATCHED_UDF + ) for i in range(nFolds): training = dataset.filter(checker_udf(dataset[foldCol]) & (col(foldCol) != lit(i))) validation = dataset.filter( diff --git a/python/pyspark/sql/connect/udf.py b/python/pyspark/sql/connect/udf.py index ab3a2da48ba55..6045e441222de 100644 --- a/python/pyspark/sql/connect/udf.py +++ b/python/pyspark/sql/connect/udf.py @@ -41,6 +41,7 @@ UDFRegistration as PySparkUDFRegistration, UserDefinedFunction as PySparkUserDefinedFunction, ) +from pyspark.sql.utils import has_arrow from pyspark.errors import PySparkTypeError, PySparkRuntimeError if TYPE_CHECKING: @@ -58,6 +59,7 @@ def _create_py_udf( returnType: "DataTypeOrString", useArrow: Optional[bool] = None, ) -> "UserDefinedFunctionLike": + is_arrow_enabled = False if useArrow is None: is_arrow_enabled = False try: @@ -78,6 +80,14 @@ def _create_py_udf( eval_type: int = PythonEvalType.SQL_BATCHED_UDF + if is_arrow_enabled and not has_arrow: + is_arrow_enabled = False + warnings.warn( + "Arrow optimization failed to enable because PyArrow is not installed. " + "Falling back to a non-Arrow-optimized UDF.", + RuntimeWarning, + ) + if is_arrow_enabled: try: is_func_with_args = len(getfullargspec(f).args) > 0 diff --git a/python/pyspark/sql/functions/builtin.py b/python/pyspark/sql/functions/builtin.py index 93ac6da1e14c5..e08243b6b9b7f 100644 --- a/python/pyspark/sql/functions/builtin.py +++ b/python/pyspark/sql/functions/builtin.py @@ -26291,7 +26291,8 @@ def udf( Defaults to :class:`StringType`. useArrow : bool, optional whether to use Arrow to optimize the (de)serialization. When it is None, the - Spark config "spark.sql.execution.pythonUDF.arrow.enabled" takes effect. + Spark config "spark.sql.execution.pythonUDF.arrow.enabled" takes effect, + which is "true" by default. Examples -------- diff --git a/python/pyspark/sql/udf.py b/python/pyspark/sql/udf.py index cf093bd936437..2fd75390f48dc 100644 --- a/python/pyspark/sql/udf.py +++ b/python/pyspark/sql/udf.py @@ -34,7 +34,7 @@ StructType, _parse_datatype_string, ) -from pyspark.sql.utils import get_active_spark_context +from pyspark.sql.utils import get_active_spark_context, has_arrow from pyspark.sql.pandas.types import to_arrow_type from pyspark.sql.pandas.utils import require_minimum_pandas_version, require_minimum_pyarrow_version from pyspark.errors import PySparkTypeError, PySparkNotImplementedError, PySparkRuntimeError @@ -118,7 +118,7 @@ def _create_py_udf( # Note: The values of 'SQL Type' are DDL formatted strings, which can be used as `returnType`s. # Note: The values inside the table are generated by `repr`. X' means it throws an exception # during the conversion. - + is_arrow_enabled = False if useArrow is None: from pyspark.sql import SparkSession @@ -131,6 +131,14 @@ def _create_py_udf( else: is_arrow_enabled = useArrow + if is_arrow_enabled and not has_arrow: + is_arrow_enabled = False + warnings.warn( + "Arrow optimization failed to enable because PyArrow is not installed. " + "Falling back to a non-Arrow-optimized UDF.", + RuntimeWarning, + ) + eval_type: int = PythonEvalType.SQL_BATCHED_UDF if is_arrow_enabled: diff --git a/python/pyspark/sql/utils.py b/python/pyspark/sql/utils.py index b0782d04cba3d..63beda40dc52d 100644 --- a/python/pyspark/sql/utils.py +++ b/python/pyspark/sql/utils.py @@ -63,6 +63,15 @@ from pyspark.pandas._typing import IndexOpsLike, SeriesOrIndex +has_arrow: bool = False +try: + import pyarrow # noqa: F401 + + has_arrow = True +except ImportError: + pass + + FuncT = TypeVar("FuncT", bound=Callable[..., Any]) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index ee8d7d53c92b7..e43a2f27aef60 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3497,7 +3497,7 @@ object SQLConf { "can only be enabled when the given function takes at least one argument.") .version("3.4.0") .booleanConf - .createWithDefault(false) + .createWithDefault(true) val PYTHON_UDF_ARROW_CONCURRENCY_LEVEL = buildConf("spark.sql.execution.pythonUDF.arrow.concurrency.level")