Skip to content

Commit 371541d

Browse files
vicennialHyukjinKwon
authored andcommitted
[SPARK-51112][CONNECT] Avoid using pyarrow's to_pandas on an empty table
### What changes were proposed in this pull request? When the `pyarrow` table is empty, avoid calling the `to_pandas` method due to potential segfault failures. Instead, an empty pandas dataframe is created manually. ### Why are the changes needed? Consider the following code: ```python from pyspark.sql.types import StructField, ArrayType, StringType, StructType, IntegerType import faulthandler faulthandler.enable() spark = SparkSession.builder \     .remote("sc://localhost:15002") \     .getOrCreate() sp_df = spark.createDataFrame(     data = [],     schema=StructType(         [             StructField(                 name='b_int',                 dataType=IntegerType(),                 nullable=False,             ),             StructField(                 name='b',                 dataType=ArrayType(ArrayType(StringType(), True), True),                 nullable=True,             ),         ]     ) ) print(sp_df) print('Spark dataframe generated.') print(sp_df.toPandas()) print('Pandas dataframe generated.') ``` Executing this may lead to a segfault when the line `sp_df.toPandas()` is run. Example: ``` Thread 0x00000001f1904f40 (most recent call first):   File "/Users/venkata.gudesa/spark/test_env/lib/python3.13/site-packages/pyarrow/pandas_compat.py", line 808 in table_to_dataframe   File "/Users/venkata.gudesa/spark/test_env/lib/python3.13/site-packages/pyspark/sql/connect/client/core.py", line 949 in to_pandas   File "/Users/venkata.gudesa/spark/test_env/lib/python3.13/site-packages/pyspark/sql/connect/dataframe.py", line 1857 in toPandas   File "<python-input-3>", line 1 in <module>   File "/opt/homebrew/Cellar/python3.13/3.13.0_1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/code.py", line 92 in runcode   File "/opt/homebrew/Cellar/python3.13/3.13.0_1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/_pyrepl/console.py", line 205 in runsource   File "/opt/homebrew/Cellar/python3.13/3.13.0_1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/code.py", line 313 in push   File "/opt/homebrew/Cellar/python3.13/3.13.0_1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/_pyrepl/simple_interact.py", line 160 in run_multiline_interactive_console   File "/opt/homebrew/Cellar/python3.13/3.13.0_1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/_pyrepl/main.py", line 59 in interactive_console   File "/opt/homebrew/Cellar/python3.13/3.13.0_1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/_pyrepl/__main__.py", line 6 in <module>   File "<frozen runpy>", line 88 in _run_code ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New unit test. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #49834 from vicennial/SPARK-51112. Lead-authored-by: vicennial <[email protected]> Co-authored-by: Venkata Sai Akhil Gudesa <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]> (cherry picked from commit 9d88020) Signed-off-by: Hyukjin Kwon <[email protected]>
1 parent 5e3d0e4 commit 371541d

File tree

3 files changed

+73
-27
lines changed

3 files changed

+73
-27
lines changed

python/pyspark/sql/connect/client/core.py

Lines changed: 33 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -933,33 +933,39 @@ def to_pandas(
933933
schema = schema or from_arrow_schema(table.schema, prefer_timestamp_ntz=True)
934934
assert schema is not None and isinstance(schema, StructType)
935935

936-
# Rename columns to avoid duplicated column names.
937-
renamed_table = table.rename_columns([f"col_{i}" for i in range(table.num_columns)])
938-
939-
pandas_options = {}
940-
if self_destruct:
941-
# Configure PyArrow to use as little memory as possible:
942-
# self_destruct - free columns as they are converted
943-
# split_blocks - create a separate Pandas block for each column
944-
# use_threads - convert one column at a time
945-
pandas_options.update(
946-
{
947-
"self_destruct": True,
948-
"split_blocks": True,
949-
"use_threads": False,
950-
}
951-
)
952-
if LooseVersion(pa.__version__) >= LooseVersion("13.0.0"):
953-
# A legacy option to coerce date32, date64, duration, and timestamp
954-
# time units to nanoseconds when converting to pandas.
955-
# This option can only be added since 13.0.0.
956-
pandas_options.update(
957-
{
958-
"coerce_temporal_nanoseconds": True,
959-
}
960-
)
961-
pdf = renamed_table.to_pandas(**pandas_options)
962-
pdf.columns = schema.names
936+
# SPARK-51112: If the table is empty, we avoid using pyarrow to_pandas to create the
937+
# DataFrame, as it may fail with a segmentation fault. Instead, we create an empty pandas
938+
# DataFrame manually with the correct schema.
939+
if table.num_rows == 0:
940+
pdf = pd.DataFrame(columns=schema.names)
941+
else:
942+
# Rename columns to avoid duplicated column names.
943+
renamed_table = table.rename_columns([f"col_{i}" for i in range(table.num_columns)])
944+
945+
pandas_options = {}
946+
if self_destruct:
947+
# Configure PyArrow to use as little memory as possible:
948+
# self_destruct - free columns as they are converted
949+
# split_blocks - create a separate Pandas block for each column
950+
# use_threads - convert one column at a time
951+
pandas_options.update(
952+
{
953+
"self_destruct": True,
954+
"split_blocks": True,
955+
"use_threads": False,
956+
}
957+
)
958+
if LooseVersion(pa.__version__) >= LooseVersion("13.0.0"):
959+
# A legacy option to coerce date32, date64, duration, and timestamp
960+
# time units to nanoseconds when converting to pandas.
961+
# This option can only be added since 13.0.0.
962+
pandas_options.update(
963+
{
964+
"coerce_temporal_nanoseconds": True,
965+
}
966+
)
967+
pdf = renamed_table.to_pandas(**pandas_options)
968+
pdf.columns = schema.names
963969

964970
if len(pdf.columns) > 0:
965971
timezone: Optional[str] = None

python/pyspark/sql/tests/connect/test_parity_collection.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@ def test_to_pandas_with_duplicated_column_names(self):
4242
def test_to_pandas_from_mixed_dataframe(self):
4343
self.check_to_pandas_from_mixed_dataframe()
4444

45+
def test_to_pandas_for_empty_df_with_nested_array_columns(self):
46+
self.check_to_pandas_for_empty_df_with_nested_array_columns()
47+
4548

4649
if __name__ == "__main__":
4750
import unittest

python/pyspark/sql/tests/test_collection.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@
1818
import unittest
1919

2020
from pyspark.sql.types import (
21+
ArrayType,
2122
StringType,
2223
IntegerType,
2324
StructType,
25+
StructField,
2426
BooleanType,
2527
DateType,
2628
TimestampType,
@@ -35,6 +37,7 @@
3537
pandas_requirement_message,
3638
pyarrow_requirement_message,
3739
)
40+
from pyspark.testing.utils import assertDataFrameEqual
3841

3942

4043
class DataFrameCollectionTestsMixin:
@@ -289,6 +292,40 @@ def check_to_pandas_for_array_of_struct(self, is_arrow_enabled):
289292
else:
290293
self.assertEqual(type(pdf["array_struct_col"][0]), list)
291294

295+
@unittest.skipIf(
296+
not have_pandas or not have_pyarrow,
297+
pandas_requirement_message or pyarrow_requirement_message,
298+
)
299+
def test_to_pandas_for_empty_df_with_nested_array_columns(self):
300+
for arrow_enabled in [False, True]:
301+
with self.sql_conf({"spark.sql.execution.arrow.pyspark.enabled": arrow_enabled}):
302+
self.check_to_pandas_for_empty_df_with_nested_array_columns()
303+
304+
def check_to_pandas_for_empty_df_with_nested_array_columns(self):
305+
# SPARK-51112: Segfault must not occur when converting empty DataFrame with nested array
306+
# columns to pandas DataFrame.
307+
import pandas as pd
308+
309+
df = self.spark.createDataFrame(
310+
data=[],
311+
schema=StructType(
312+
[
313+
StructField(
314+
name="b_int",
315+
dataType=IntegerType(),
316+
nullable=False,
317+
),
318+
StructField(
319+
name="b",
320+
dataType=ArrayType(ArrayType(StringType(), True), True),
321+
nullable=True,
322+
),
323+
]
324+
),
325+
)
326+
expected_pdf = pd.DataFrame(columns=["b_int", "b"])
327+
assertDataFrameEqual(df.toPandas(), expected_pdf)
328+
292329
def test_to_local_iterator(self):
293330
df = self.spark.range(8, numPartitions=4)
294331
expected = df.collect()

0 commit comments

Comments
 (0)