Skip to content

Commit 1b75f3b

Browse files
Devaraj Ksrowen
Devaraj K
authored andcommitted
[SPARK-17928][MESOS] No driver.memoryOverhead setting for mesos cluster mode
## What changes were proposed in this pull request? Added a new configuration 'spark.mesos.driver.memoryOverhead' for providing the driver memory overhead in mesos cluster mode. ## How was this patch tested? Verified it manually, Resource Scheduler allocates (drivermemory+ driver memoryOverhead) for driver in mesos cluster mode. Closes apache#17726 from devaraj-kavali/SPARK-17928. Authored-by: Devaraj K <[email protected]> Signed-off-by: Sean Owen <[email protected]>
1 parent 8a54492 commit 1b75f3b

File tree

3 files changed

+95
-3
lines changed

3 files changed

+95
-3
lines changed

docs/configuration.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ of the most common options to set are:
169169
The amount of off-heap memory to be allocated per driver in cluster mode, in MiB unless
170170
otherwise specified. This is memory that accounts for things like VM overheads, interned strings,
171171
other native overheads, etc. This tends to grow with the container size (typically 6-10%).
172-
This option is currently supported on YARN and Kubernetes.
172+
This option is currently supported on YARN, Mesos and Kubernetes.
173173
</td>
174174
</tr>
175175
<tr>

resource-managers/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala

+9-2
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@ private[mesos] class MesosSubmitRequestServlet(
6868
private def newDriverId(submitDate: Date): String =
6969
f"driver-${createDateFormat.format(submitDate)}-${nextDriverNumber.incrementAndGet()}%04d"
7070

71+
// These defaults copied from YARN
72+
private val MEMORY_OVERHEAD_FACTOR = 0.10
73+
private val MEMORY_OVERHEAD_MIN = 384
74+
7175
/**
7276
* Build a driver description from the fields specified in the submit request.
7377
*
@@ -98,6 +102,7 @@ private[mesos] class MesosSubmitRequestServlet(
98102
val driverExtraLibraryPath = sparkProperties.get(config.DRIVER_LIBRARY_PATH.key)
99103
val superviseDriver = sparkProperties.get(config.DRIVER_SUPERVISE.key)
100104
val driverMemory = sparkProperties.get(config.DRIVER_MEMORY.key)
105+
val driverMemoryOverhead = sparkProperties.get(config.DRIVER_MEMORY_OVERHEAD.key)
101106
val driverCores = sparkProperties.get(config.DRIVER_CORES.key)
102107
val name = request.sparkProperties.getOrElse("spark.app.name", mainClass)
103108

@@ -112,13 +117,15 @@ private[mesos] class MesosSubmitRequestServlet(
112117
mainClass, appArgs, environmentVariables, extraClassPath, extraLibraryPath, javaOpts)
113118
val actualSuperviseDriver = superviseDriver.map(_.toBoolean).getOrElse(DEFAULT_SUPERVISE)
114119
val actualDriverMemory = driverMemory.map(Utils.memoryStringToMb).getOrElse(DEFAULT_MEMORY)
120+
val actualDriverMemoryOverhead = driverMemoryOverhead.map(_.toInt).getOrElse(
121+
math.max((MEMORY_OVERHEAD_FACTOR * actualDriverMemory).toInt, MEMORY_OVERHEAD_MIN))
115122
val actualDriverCores = driverCores.map(_.toDouble).getOrElse(DEFAULT_CORES)
116123
val submitDate = new Date()
117124
val submissionId = newDriverId(submitDate)
118125

119126
new MesosDriverDescription(
120-
name, appResource, actualDriverMemory, actualDriverCores, actualSuperviseDriver,
121-
command, request.sparkProperties, submissionId, submitDate)
127+
name, appResource, actualDriverMemory + actualDriverMemoryOverhead, actualDriverCores,
128+
actualSuperviseDriver, command, request.sparkProperties, submissionId, submitDate)
122129
}
123130

124131
protected override def handleSubmit(
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.deploy.rest.mesos
19+
20+
import javax.servlet.http.HttpServletResponse
21+
22+
import org.scalatest.mockito.MockitoSugar
23+
24+
import org.apache.spark.{SparkConf, SparkFunSuite}
25+
import org.apache.spark.deploy.TestPrematureExit
26+
import org.apache.spark.deploy.mesos.MesosDriverDescription
27+
import org.apache.spark.deploy.rest.{CreateSubmissionRequest, CreateSubmissionResponse, SubmitRestProtocolMessage, SubmitRestProtocolResponse}
28+
import org.apache.spark.internal.config
29+
import org.apache.spark.scheduler.cluster.mesos.{MesosClusterPersistenceEngineFactory, MesosClusterScheduler}
30+
31+
class MesosRestServerSuite extends SparkFunSuite
32+
with TestPrematureExit with MockitoSugar {
33+
34+
test("test default driver overhead memory") {
35+
testOverheadMemory(new SparkConf(), "2000M", 2384)
36+
}
37+
38+
test("test driver overhead memory with overhead factor") {
39+
testOverheadMemory(new SparkConf(), "5000M", 5500)
40+
}
41+
42+
test("test configured driver overhead memory") {
43+
val conf = new SparkConf()
44+
conf.set(config.DRIVER_MEMORY_OVERHEAD.key, "1000")
45+
testOverheadMemory(conf, "2000M", 3000)
46+
}
47+
48+
def testOverheadMemory(conf: SparkConf, driverMemory: String, expectedResult: Int) {
49+
conf.set("spark.master", "testmaster")
50+
conf.set("spark.app.name", "testapp")
51+
conf.set(config.DRIVER_MEMORY.key, driverMemory)
52+
var actualMem = 0
53+
class TestMesosClusterScheduler extends MesosClusterScheduler(
54+
mock[MesosClusterPersistenceEngineFactory], conf) {
55+
override def submitDriver(desc: MesosDriverDescription): CreateSubmissionResponse = {
56+
actualMem = desc.mem
57+
mock[CreateSubmissionResponse]
58+
}
59+
}
60+
61+
class TestServlet extends MesosSubmitRequestServlet(new TestMesosClusterScheduler, conf) {
62+
override def handleSubmit(
63+
requestMessageJson: String,
64+
requestMessage: SubmitRestProtocolMessage,
65+
responseServlet: HttpServletResponse): SubmitRestProtocolResponse = {
66+
super.handleSubmit(requestMessageJson, requestMessage, responseServlet)
67+
}
68+
69+
override def findUnknownFields(
70+
requestJson: String,
71+
requestMessage: SubmitRestProtocolMessage): Array[String] = {
72+
Array()
73+
}
74+
}
75+
val servlet = new TestServlet()
76+
val request = new CreateSubmissionRequest()
77+
request.appResource = "testresource"
78+
request.mainClass = "mainClass"
79+
request.appArgs = Array("appArgs")
80+
request.environmentVariables = Map("envVar" -> "envVal")
81+
request.sparkProperties = conf.getAll.toMap
82+
servlet.handleSubmit("null", request, null)
83+
assert(actualMem == expectedResult)
84+
}
85+
}

0 commit comments

Comments
 (0)