-
Notifications
You must be signed in to change notification settings - Fork 28.8k
[SPARK-53413] Shuffle cleanup for commands #52157
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
@cloud-fan @ulysses-you Please help review when you get a chance to. |
case exec: ShuffleExchangeLike => | ||
exec.shuffleId | ||
} | ||
case command: V2CommandExec => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about other commands?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have handled the DataWritingCommandExec,
I think it might be tricky for ExecutedCommandExec/V1Commands, since the spark plan is created internally.
Should we enumerate each SparkPlan type and handle them?
@@ -150,7 +150,7 @@ class QueryExecution( | |||
// with the rest of processing of the root plan being just outputting command results, | |||
// for eagerly executed commands we mark this place as beginning of execution. | |||
tracker.setReadyForExecution() | |||
val qe = sparkSession.sessionState.executePlan(p, mode) | |||
val qe = sparkSession.sessionState.executePlan(p, mode, shuffleCleanupMode) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of calling executePlan
, can we construct QueryExecution
instance directly? I feel this executePlan
method is a bit useless.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems to have been introduced here. It doesnt mention the rationale behind. But like you mentioned it does nt serve any real purpose, creating new QueryExecution
Why do we need this handling in commands? What about other flows? Or how is this different from other flows? |
case exec: ShuffleExchangeLike => | ||
exec.shuffleId | ||
} | ||
case command: V2CommandExec => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have handled the DataWritingCommandExec,
I think it might be tricky for ExecutedCommandExec/V1Commands, since the spark plan is created internally.
Should we enumerate each SparkPlan type and handle them?
@@ -3586,7 +3586,8 @@ object SQLConf { | |||
val CLASSIC_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED = | |||
buildConf("spark.sql.classic.shuffleDependency.fileCleanup.enabled") | |||
.doc("When enabled, shuffle files will be cleaned up at the end of classic " + | |||
"SQL executions.") | |||
"SQL executions. Note that this cleanup may cause stage retries and regenerate " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is related to this comment
ef482e2
to
25406bc
Compare
All other flows were handled here |
What changes were proposed in this pull request?
Changes to cleanup shuffle generated from running commands(eg writes)
This was also brought by @cloud-fan and @ulysses-you here
Why are the changes needed?
To cleanupshuffle generated from commands
Does this PR introduce any user-facing change?
No
How was this patch tested?
Unit test added
Was this patch authored or co-authored using generative AI tooling?
No