Skip to content

Commit 1a8859d

Browse files
ueshinHyukjinKwon
authored andcommitted
[SPARK-52579][PYTHON] Set periodical traceback dump for Python workers
### What changes were proposed in this pull request? Sets periodical traceback dump for Python workers. To enable: - `spark.python.worker.tracebackDumpIntervalSeconds` (SparkConf, default `0`) > The interval (in seconds) for Python workers to dump their tracebacks. If it's positive, the Python worker will periodically dump the traceback into its executor's `stderr`. The default is `0` that means it is disabled. - `spark.sql.execution.pyspark.udf.tracebackDumpIntervalSeconds` (SQLConf, fallback to the above) > Same as spark.python.worker.tracebackDumpIntervalSeconds for Python execution with DataFrame and SQL. It can change during runtime. ### Why are the changes needed? To monitor the Python worker progress. ### Does this PR introduce _any_ user-facing change? Yes, the traceback will be dumped periodically when the config is set to a positive number. ```py >>> from pyspark.sql.functions import * >>> import time >>> >>> udf("long") ... def f(x): ... time.sleep(12) ... return x ... >>> df = spark.range(1).select(f(col("id"))) >>> spark.conf.set('spark.sql.execution.pyspark.udf.tracebackDumpIntervalSeconds', 5) >>> >>> df.show() Timeout (0:00:05)! Thread 0x00000001ede60f80 (most recent call first): File "<stdin>", line 3 in f File "/.../python/pyspark/util.py", line 135 in wrapper File "/.../python/pyspark/worker.py", line 121 in <lambda> ... Timeout (0:00:05)! Thread 0x00000001ede60f80 (most recent call first): File "<stdin>", line 3 in f File "/.../python/pyspark/util.py", line 135 in wrapper File "/.../python/pyspark/worker.py", line 121 in <lambda> ... +-----+ |f(id)| +-----+ | 0| +-----+ ``` ### How was this patch tested? Manually, and existing tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #51286 from ueshin/issues/SPARK-52579/traceback. Authored-by: Takuya Ueshin <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
1 parent ebf5a2a commit 1a8859d

File tree

10 files changed

+47
-0
lines changed

10 files changed

+47
-0
lines changed

core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,8 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
170170
protected val faultHandlerEnabled: Boolean = conf.get(PYTHON_WORKER_FAULTHANLDER_ENABLED)
171171
protected val idleTimeoutSeconds: Long = conf.get(PYTHON_WORKER_IDLE_TIMEOUT_SECONDS)
172172
protected val killOnIdleTimeout: Boolean = conf.get(PYTHON_WORKER_KILL_ON_IDLE_TIMEOUT)
173+
protected val tracebackDumpIntervalSeconds: Long =
174+
conf.get(PYTHON_WORKER_TRACEBACK_DUMP_INTERVAL_SECONDS)
173175
protected val hideTraceback: Boolean = false
174176
protected val simplifiedTraceback: Boolean = false
175177

@@ -267,6 +269,9 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
267269
if (faultHandlerEnabled) {
268270
envVars.put("PYTHON_FAULTHANDLER_DIR", faultHandlerLogDir.toString)
269271
}
272+
if (tracebackDumpIntervalSeconds > 0L) {
273+
envVars.put("PYTHON_TRACEBACK_DUMP_INTERVAL_SECONDS", tracebackDumpIntervalSeconds.toString)
274+
}
270275
// allow the user to set the batch size for the BatchedSerializer on UDFs
271276
envVars.put("PYTHON_UDF_BATCH_SIZE", batchSizeForPythonUDF.toString)
272277

core/src/main/scala/org/apache/spark/internal/config/Python.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,4 +117,14 @@ private[spark] object Python {
117117
.version("4.1.0")
118118
.booleanConf
119119
.createWithDefault(false)
120+
121+
val PYTHON_WORKER_TRACEBACK_DUMP_INTERVAL_SECONDS =
122+
ConfigBuilder("spark.python.worker.tracebackDumpIntervalSeconds")
123+
.doc("The interval (in seconds) for Python workers to dump their tracebacks. " +
124+
"If it's positive, the Python worker will periodically dump the traceback into " +
125+
"its executor's `stderr`. The default is `0` that means it is disabled.")
126+
.version("4.1.0")
127+
.timeConf(TimeUnit.SECONDS)
128+
.checkValue(_ >= 0, "The interval should be 0 or positive.")
129+
.createWithDefault(0)
120130
}

python/pyspark/worker.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2347,6 +2347,9 @@ def func(_, it):
23472347

23482348
def main(infile, outfile):
23492349
faulthandler_log_path = os.environ.get("PYTHON_FAULTHANDLER_DIR", None)
2350+
tracebackDumpIntervalSeconds = os.environ.get("PYTHON_TRACEBACK_DUMP_INTERVAL_SECONDS", None)
2351+
if tracebackDumpIntervalSeconds is not None:
2352+
tracebackDumpIntervalSeconds = int(tracebackDumpIntervalSeconds)
23502353
try:
23512354
if faulthandler_log_path:
23522355
faulthandler_log_path = os.path.join(faulthandler_log_path, str(os.getpid()))
@@ -2358,6 +2361,9 @@ def main(infile, outfile):
23582361
if split_index == -1: # for unit tests
23592362
sys.exit(-1)
23602363

2364+
if tracebackDumpIntervalSeconds is not None and tracebackDumpIntervalSeconds > 0:
2365+
faulthandler.dump_traceback_later(tracebackDumpIntervalSeconds, repeat=True)
2366+
23612367
check_python_version(infile)
23622368

23632369
# read inputs only for a barrier task
@@ -2465,6 +2471,9 @@ def process():
24652471
write_int(SpecialLengths.END_OF_DATA_SECTION, outfile)
24662472
sys.exit(-1)
24672473

2474+
# Force to cancel dump_traceback_later
2475+
faulthandler.cancel_dump_traceback_later()
2476+
24682477

24692478
if __name__ == "__main__":
24702479
# Read information about how to connect back to the JVM from the environment.

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3549,6 +3549,14 @@ object SQLConf {
35493549
.version("4.1.0")
35503550
.fallbackConf(Python.PYTHON_WORKER_KILL_ON_IDLE_TIMEOUT)
35513551

3552+
val PYTHON_UDF_WORKER_TRACEBACK_DUMP_INTERVAL_SECONDS =
3553+
buildConf("spark.sql.execution.pyspark.udf.tracebackDumpIntervalSeconds")
3554+
.doc(
3555+
s"Same as ${Python.PYTHON_WORKER_TRACEBACK_DUMP_INTERVAL_SECONDS.key} " +
3556+
"for Python execution with DataFrame and SQL. It can change during runtime.")
3557+
.version("4.1.0")
3558+
.fallbackConf(Python.PYTHON_WORKER_TRACEBACK_DUMP_INTERVAL_SECONDS)
3559+
35523560
val PYSPARK_PLOT_MAX_ROWS =
35533561
buildConf("spark.sql.pyspark.plotting.max_rows")
35543562
.doc("The visual limit on plots. If set to 1000 for top-n-based plots (pie, bar, barh), " +
@@ -6731,6 +6739,9 @@ class SQLConf extends Serializable with Logging with SqlApiConf {
67316739

67326740
def pythonUDFWorkerKillOnIdleTimeout: Boolean = getConf(PYTHON_UDF_WORKER_KILL_ON_IDLE_TIMEOUT)
67336741

6742+
def pythonUDFWorkerTracebackDumpIntervalSeconds: Long =
6743+
getConf(PYTHON_UDF_WORKER_TRACEBACK_DUMP_INTERVAL_SECONDS)
6744+
67346745
def pythonUDFArrowConcurrencyLevel: Option[Int] = getConf(PYTHON_UDF_ARROW_CONCURRENCY_LEVEL)
67356746

67366747
def pythonUDFArrowFallbackOnUDT: Boolean = getConf(PYTHON_UDF_ARROW_FALLBACK_ON_UDT)

sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ abstract class BaseArrowPythonRunner(
4949
override val faultHandlerEnabled: Boolean = SQLConf.get.pythonUDFWorkerFaulthandlerEnabled
5050
override val idleTimeoutSeconds: Long = SQLConf.get.pythonUDFWorkerIdleTimeoutSeconds
5151
override val killOnIdleTimeout: Boolean = SQLConf.get.pythonUDFWorkerKillOnIdleTimeout
52+
override val tracebackDumpIntervalSeconds: Long =
53+
SQLConf.get.pythonUDFWorkerTracebackDumpIntervalSeconds
5254

5355
override val errorOnDuplicatedFieldNames: Boolean = true
5456

sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonUDTFRunner.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ class ArrowPythonUDTFRunner(
5858
override val faultHandlerEnabled: Boolean = SQLConf.get.pythonUDFWorkerFaulthandlerEnabled
5959
override val idleTimeoutSeconds: Long = SQLConf.get.pythonUDFWorkerIdleTimeoutSeconds
6060
override val killOnIdleTimeout: Boolean = SQLConf.get.pythonUDFWorkerKillOnIdleTimeout
61+
override val tracebackDumpIntervalSeconds: Long =
62+
SQLConf.get.pythonUDFWorkerTracebackDumpIntervalSeconds
6163

6264
override val errorOnDuplicatedFieldNames: Boolean = true
6365

sql/core/src/main/scala/org/apache/spark/sql/execution/python/CoGroupedArrowPythonRunner.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ class CoGroupedArrowPythonRunner(
6262
override val faultHandlerEnabled: Boolean = SQLConf.get.pythonUDFWorkerFaulthandlerEnabled
6363
override val idleTimeoutSeconds: Long = SQLConf.get.pythonUDFWorkerIdleTimeoutSeconds
6464
override val killOnIdleTimeout: Boolean = SQLConf.get.pythonUDFWorkerKillOnIdleTimeout
65+
override val tracebackDumpIntervalSeconds: Long =
66+
SQLConf.get.pythonUDFWorkerTracebackDumpIntervalSeconds
6567

6668
override val hideTraceback: Boolean = SQLConf.get.pysparkHideTraceback
6769
override val simplifiedTraceback: Boolean = SQLConf.get.pysparkSimplifiedTraceback

sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ abstract class BasePythonUDFRunner(
4848
override val faultHandlerEnabled: Boolean = SQLConf.get.pythonUDFWorkerFaulthandlerEnabled
4949
override val idleTimeoutSeconds: Long = SQLConf.get.pythonUDFWorkerIdleTimeoutSeconds
5050
override val killOnIdleTimeout: Boolean = SQLConf.get.pythonUDFWorkerKillOnIdleTimeout
51+
override val tracebackDumpIntervalSeconds: Long =
52+
SQLConf.get.pythonUDFWorkerTracebackDumpIntervalSeconds
5153

5254
override val bufferSize: Int = SQLConf.get.getConf(SQLConf.PYTHON_UDF_BUFFER_SIZE)
5355
override val batchSizeForPythonUDF: Int =

sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/ApplyInPandasWithStatePythonRunner.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@ class ApplyInPandasWithStatePythonRunner(
7878
override val faultHandlerEnabled: Boolean = SQLConf.get.pythonUDFWorkerFaulthandlerEnabled
7979
override val idleTimeoutSeconds: Long = SQLConf.get.pythonUDFWorkerIdleTimeoutSeconds
8080
override val killOnIdleTimeout: Boolean = SQLConf.get.pythonUDFWorkerKillOnIdleTimeout
81+
override val tracebackDumpIntervalSeconds: Long =
82+
SQLConf.get.pythonUDFWorkerTracebackDumpIntervalSeconds
8183

8284
private val sqlConf = SQLConf.get
8385

sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/PythonForeachWriter.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,8 @@ class PythonForeachWriter(func: PythonFunction, schema: StructType)
102102
override val faultHandlerEnabled: Boolean = SQLConf.get.pythonUDFWorkerFaulthandlerEnabled
103103
override val idleTimeoutSeconds: Long = SQLConf.get.pythonUDFWorkerIdleTimeoutSeconds
104104
override val killOnIdleTimeout: Boolean = SQLConf.get.pythonUDFWorkerKillOnIdleTimeout
105+
override val tracebackDumpIntervalSeconds: Long =
106+
SQLConf.get.pythonUDFWorkerTracebackDumpIntervalSeconds
105107

106108
override val hideTraceback: Boolean = SQLConf.get.pysparkHideTraceback
107109
override val simplifiedTraceback: Boolean = SQLConf.get.pysparkSimplifiedTraceback

0 commit comments

Comments
 (0)