Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Parallel save result with export_workspace. #995

Open
HansVRP opened this issue Jan 14, 2025 · 8 comments
Open

Support Parallel save result with export_workspace. #995

HansVRP opened this issue Jan 14, 2025 · 8 comments
Assignees

Comments

@HansVRP
Copy link

HansVRP commented Jan 14, 2025

Example Job: j-25011314204947baa2057fa6f64bae8b

More info: spark is perfectly fine with running multiple jobs and stages concurrently. By doing so, we can keep executor allocation rate high.
So in the presence of multiple 'save_result' nodes, this could really help to improve overall performance.

@JeroenVerstraelen
Copy link
Contributor

TODO: Estimate the amount of work this issue will require

@JeroenVerstraelen
Copy link
Contributor

LCFM also benefits from this issue

@EmileSonneveld
Copy link
Contributor

A quick attempt to distribute the SaveResult nodes to multiple python threads gave an error when trying to serialize the RDDs inside it.
RuntimeError: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

_pickle.PicklingError: Could not pickle the task to send it to the workers.

It might be needed to run the ProcessGraphDeserializer.evaluate for each thread too.
Or find a way to start the write_assets tasks all a once from the main thread, and then start polling for the results also on the main thread.

@JeroenVerstraelen
Copy link
Contributor

Would something like collectAsync help in this case:

val value = rdd.collect() //RDD elements will be copied to spark driver 
val value = rdd.collectAsync() //no copy here  
value.get() //Now, RDD elements will be copied to spark driver 

@EmileSonneveld
Copy link
Contributor

No huge differences found with a small job that has multiple save results. Launched with and without concurrent-save-results a few times after each other.

j-2501291213194f2aa1f2b13e4f46d750
a-38c11ad2cff94ed09b0d556432c8a3d4
"concurrent-save-results": 10
Spark Duration: 59sec
Wall time: 96 seconds

j-25012912081248e292aa4d2a0f17109a
a-f08bd3ac82694145a2c0bbf3e8a27489
"concurrent-save-results": null
Spark Duration: 3.0min
Wall time: 205 seconds


j-25012912210849f99e7b0fee685bdcbf
a-3e0656f10699476793484210d1780545
"concurrent-save-results": 10
Spark Duration: 1.2 min
Wall time: 146 seconds

j-2501291221144b03931163b16921fba6
a-3ebac615b87843b6bfe54b0ea72cad87
"concurrent-save-results": null
Spark Duration: 2.1 min
Wall time: 146 seconds


j-250129124820445d942e0b00a5d962f7
a-71393a4aa8d741fca53810a7d6075f52
"concurrent-save-results": 10
Spark Duration: 1.0min
Wall time: 101 seconds

j-250129124513489191fe5ab26734df35
a-76e20cd5372b4770be4e6cf0bab477f5
"concurrent-save-results": null
Spark Duration 52sec
Wall time: 89 seconds

@VictorVerhaert
Copy link

This features tries to minimize idle time of executors right? Perhaps you can try to increase the max-executors to see if we can increase the speed without increasing the total cost?

@EmileSonneveld
Copy link
Contributor

Yes. On a large graph the max-executors is clearly limiting. I am running a large test with max-executors on 100. I'll report back when that one is finished.

With a medium sized job, max-executors did not have a big effect, but concurrent-save-results did:

j-25012913172644639fd9e570ea3b365e
a-ac17868b4dd649b8b2101702743eeeae
Incurred Costs: 5 credits
Wall time: 401 seconds
Spark Duration: 6.3 min
"concurrent-save-results": null

j-25012913112845b5a2e711c040634cd4
a-72064b862b20485190c476ef8134c96c
Incurred Costs: 4 credits
Wall time: 148 seconds
Spark Duration: 2.1min
"concurrent-save-results": 10

j-2501291340294a82960477f615627406
a-0e0dcaf10ef34a1c8e94ba390dfaa252
Incurred Costs: 4 credits
Wall time: 158 seconds
Spark Duration: 2.3 min
"max-executors": 100,
"concurrent-save-results": 10

@EmileSonneveld
Copy link
Contributor

Results when running with the huge process graph Hans provided:
The performance increase is difficult to measure for concurrent-save-results.
But it does improve compared to max-executors

j-250129103340478f8046b14a023e9ebf
a-96ee39a8a66b4ce4b4f9ca855ecbf1f6
Incurred Costs: 335 credits
Wall time: 9,497 seconds
Spark Duration: 2.6h
"concurrent-save-results": null

j-2501291033484e39a94d483cf330c624
a-78b22161b4164dd0bfa801c4918ccb80
Incurred Costs: 364 credits
Wall time: 9,212 seconds
Spark Duration: 2.6 h
"concurrent-save-results": 2

j-25012812285248c3b2a275faaa824121
a-809697aff06e4a76bb02204aef86e0df
Incurred Costs: 416 credits
Wall time: 10,732 seconds
Spark Duiration 3.0h
"concurrent-save-results": 30

j-2501291510034065913587449c97f8bd
a-8e7f44e65ecb4df8b7da651c6bf6a441
Incurred Costs: 597 credits
Wall time: 4,337 seconds
Spark Duration: 1.2h
"max-executors": 100

j-2501291357374f90a0c769f337bfbb60
a-7352265cea1d40788dbc63a3a27d8c61
Incurred Costs: 475 credits
Wall time: 3,654 seconds
Spark Duration: 1.0h
"max-executors": 100,
"concurrent-save-results": 10

This effect is seen when with parallel save results and not enough executors. Normally the collects should be around the same length:
Image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants