@@ -153,121 +153,7 @@ class SparkContext(
153153 executorEnvs(" SPARK_USER" ) = sparkUser
154154
155155 // Create and start the scheduler
156- private [spark] var taskScheduler : TaskScheduler = {
157- // Regular expression used for local[N] master format
158- val LOCAL_N_REGEX = """ local\[([0-9]+)\]""" .r
159- // Regular expression for local[N, maxRetries], used in tests with failing tasks
160- val LOCAL_N_FAILURES_REGEX = """ local\[([0-9]+)\s*,\s*([0-9]+)\]""" .r
161- // Regular expression for simulating a Spark cluster of [N, cores, memory] locally
162- val LOCAL_CLUSTER_REGEX = """ local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""" .r
163- // Regular expression for connecting to Spark deploy clusters
164- val SPARK_REGEX = """ spark://(.*)""" .r
165- // Regular expression for connection to Mesos cluster by mesos:// or zk:// url
166- val MESOS_REGEX = """ (mesos|zk)://.*""" .r
167- // Regular expression for connection to Simr cluster
168- val SIMR_REGEX = """ simr://(.*)""" .r
169-
170- master match {
171- case " local" =>
172- new LocalScheduler (1 , 0 , this )
173-
174- case LOCAL_N_REGEX (threads) =>
175- new LocalScheduler (threads.toInt, 0 , this )
176-
177- case LOCAL_N_FAILURES_REGEX (threads, maxFailures) =>
178- new LocalScheduler (threads.toInt, maxFailures.toInt, this )
179-
180- case SPARK_REGEX (sparkUrl) =>
181- val scheduler = new ClusterScheduler (this )
182- val masterUrls = sparkUrl.split(" ," ).map(" spark://" + _)
183- val backend = new SparkDeploySchedulerBackend (scheduler, this , masterUrls, appName)
184- scheduler.initialize(backend)
185- scheduler
186-
187- case SIMR_REGEX (simrUrl) =>
188- val scheduler = new ClusterScheduler (this )
189- val backend = new SimrSchedulerBackend (scheduler, this , simrUrl)
190- scheduler.initialize(backend)
191- scheduler
192-
193- case LOCAL_CLUSTER_REGEX (numSlaves, coresPerSlave, memoryPerSlave) =>
194- // Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.
195- val memoryPerSlaveInt = memoryPerSlave.toInt
196- if (SparkContext .executorMemoryRequested > memoryPerSlaveInt) {
197- throw new SparkException (
198- " Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker" .format(
199- memoryPerSlaveInt, SparkContext .executorMemoryRequested))
200- }
201-
202- val scheduler = new ClusterScheduler (this )
203- val localCluster = new LocalSparkCluster (
204- numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt)
205- val masterUrls = localCluster.start()
206- val backend = new SparkDeploySchedulerBackend (scheduler, this , masterUrls, appName)
207- scheduler.initialize(backend)
208- backend.shutdownCallback = (backend : SparkDeploySchedulerBackend ) => {
209- localCluster.stop()
210- }
211- scheduler
212-
213- case " yarn-standalone" =>
214- val scheduler = try {
215- val clazz = Class .forName(" org.apache.spark.scheduler.cluster.YarnClusterScheduler" )
216- val cons = clazz.getConstructor(classOf [SparkContext ])
217- cons.newInstance(this ).asInstanceOf [ClusterScheduler ]
218- } catch {
219- // TODO: Enumerate the exact reasons why it can fail
220- // But irrespective of it, it means we cannot proceed !
221- case th : Throwable => {
222- throw new SparkException (" YARN mode not available ?" , th)
223- }
224- }
225- val backend = new CoarseGrainedSchedulerBackend (scheduler, this .env.actorSystem)
226- scheduler.initialize(backend)
227- scheduler
228-
229- case " yarn-client" =>
230- val scheduler = try {
231- val clazz = Class .forName(" org.apache.spark.scheduler.cluster.YarnClientClusterScheduler" )
232- val cons = clazz.getConstructor(classOf [SparkContext ])
233- cons.newInstance(this ).asInstanceOf [ClusterScheduler ]
234-
235- } catch {
236- case th : Throwable => {
237- throw new SparkException (" YARN mode not available ?" , th)
238- }
239- }
240-
241- val backend = try {
242- val clazz = Class .forName(" org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend" )
243- val cons = clazz.getConstructor(classOf [ClusterScheduler ], classOf [SparkContext ])
244- cons.newInstance(scheduler, this ).asInstanceOf [CoarseGrainedSchedulerBackend ]
245- } catch {
246- case th : Throwable => {
247- throw new SparkException (" YARN mode not available ?" , th)
248- }
249- }
250-
251- scheduler.initialize(backend)
252- scheduler
253-
254- case mesosUrl @ MESOS_REGEX (_) =>
255- MesosNativeLibrary .load()
256- val scheduler = new ClusterScheduler (this )
257- val coarseGrained = System .getProperty(" spark.mesos.coarse" , " false" ).toBoolean
258- val url = mesosUrl.stripPrefix(" mesos://" ) // strip scheme from raw Mesos URLs
259- val backend = if (coarseGrained) {
260- new CoarseMesosSchedulerBackend (scheduler, this , url, appName)
261- } else {
262- new MesosSchedulerBackend (scheduler, this , url, appName)
263- }
264- scheduler.initialize(backend)
265- scheduler
266-
267- case _ =>
268- throw new SparkException (" Could not parse Master URL: '" + master + " '" )
269- }
270- }
156+ private [spark] var taskScheduler = SparkContext .createTaskScheduler(this , master, appName)
271157 taskScheduler.start()
272158
273159 @ volatile private [spark] var dagScheduler = new DAGScheduler (taskScheduler)
@@ -1137,6 +1023,124 @@ object SparkContext {
11371023 .map(Utils .memoryStringToMb)
11381024 .getOrElse(512 )
11391025 }
1026+
1027+ // Creates a task scheduler based on a given master URL. Extracted for testing.
1028+ private
1029+ def createTaskScheduler (sc : SparkContext , master : String , appName : String ): TaskScheduler = {
1030+ // Regular expression used for local[N] master format
1031+ val LOCAL_N_REGEX = """ local\[([0-9]+)\]""" .r
1032+ // Regular expression for local[N, maxRetries], used in tests with failing tasks
1033+ val LOCAL_N_FAILURES_REGEX = """ local\[([0-9]+)\s*,\s*([0-9]+)\]""" .r
1034+ // Regular expression for simulating a Spark cluster of [N, cores, memory] locally
1035+ val LOCAL_CLUSTER_REGEX = """ local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""" .r
1036+ // Regular expression for connecting to Spark deploy clusters
1037+ val SPARK_REGEX = """ spark://(.*)""" .r
1038+ // Regular expression for connection to Mesos cluster by mesos:// or zk:// url
1039+ val MESOS_REGEX = """ (mesos|zk)://.*""" .r
1040+ // Regular expression for connection to Simr cluster
1041+ val SIMR_REGEX = """ simr://(.*)""" .r
1042+
1043+ master match {
1044+ case " local" =>
1045+ new LocalScheduler (1 , 0 , sc)
1046+
1047+ case LOCAL_N_REGEX (threads) =>
1048+ new LocalScheduler (threads.toInt, 0 , sc)
1049+
1050+ case LOCAL_N_FAILURES_REGEX (threads, maxFailures) =>
1051+ new LocalScheduler (threads.toInt, maxFailures.toInt, sc)
1052+
1053+ case SPARK_REGEX (sparkUrl) =>
1054+ val scheduler = new ClusterScheduler (sc)
1055+ val masterUrls = sparkUrl.split(" ," ).map(" spark://" + _)
1056+ val backend = new SparkDeploySchedulerBackend (scheduler, sc, masterUrls, appName)
1057+ scheduler.initialize(backend)
1058+ scheduler
1059+
1060+ case LOCAL_CLUSTER_REGEX (numSlaves, coresPerSlave, memoryPerSlave) =>
1061+ // Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.
1062+ val memoryPerSlaveInt = memoryPerSlave.toInt
1063+ if (SparkContext .executorMemoryRequested > memoryPerSlaveInt) {
1064+ throw new SparkException (
1065+ " Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker" .format(
1066+ memoryPerSlaveInt, SparkContext .executorMemoryRequested))
1067+ }
1068+
1069+ val scheduler = new ClusterScheduler (sc)
1070+ val localCluster = new LocalSparkCluster (
1071+ numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt)
1072+ val masterUrls = localCluster.start()
1073+ val backend = new SparkDeploySchedulerBackend (scheduler, sc, masterUrls, appName)
1074+ scheduler.initialize(backend)
1075+ backend.shutdownCallback = (backend : SparkDeploySchedulerBackend ) => {
1076+ localCluster.stop()
1077+ }
1078+ scheduler
1079+
1080+ case " yarn-standalone" =>
1081+ val scheduler = try {
1082+ val clazz = Class .forName(" org.apache.spark.scheduler.cluster.YarnClusterScheduler" )
1083+ val cons = clazz.getConstructor(classOf [SparkContext ])
1084+ cons.newInstance(sc).asInstanceOf [ClusterScheduler ]
1085+ } catch {
1086+ // TODO: Enumerate the exact reasons why it can fail
1087+ // But irrespective of it, it means we cannot proceed !
1088+ case th : Throwable => {
1089+ throw new SparkException (" YARN mode not available ?" , th)
1090+ }
1091+ }
1092+ val backend = new CoarseGrainedSchedulerBackend (scheduler, sc.env.actorSystem)
1093+ scheduler.initialize(backend)
1094+ scheduler
1095+
1096+ case " yarn-client" =>
1097+ val scheduler = try {
1098+ val clazz = Class .forName(" org.apache.spark.scheduler.cluster.YarnClientClusterScheduler" )
1099+ val cons = clazz.getConstructor(classOf [SparkContext ])
1100+ cons.newInstance(sc).asInstanceOf [ClusterScheduler ]
1101+
1102+ } catch {
1103+ case th : Throwable => {
1104+ throw new SparkException (" YARN mode not available ?" , th)
1105+ }
1106+ }
1107+
1108+ val backend = try {
1109+ val clazz = Class .forName(" org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend" )
1110+ val cons = clazz.getConstructor(classOf [ClusterScheduler ], classOf [SparkContext ])
1111+ cons.newInstance(scheduler, sc).asInstanceOf [CoarseGrainedSchedulerBackend ]
1112+ } catch {
1113+ case th : Throwable => {
1114+ throw new SparkException (" YARN mode not available ?" , th)
1115+ }
1116+ }
1117+
1118+ scheduler.initialize(backend)
1119+ scheduler
1120+
1121+ case mesosUrl @ MESOS_REGEX (_) =>
1122+ MesosNativeLibrary .load()
1123+ val scheduler = new ClusterScheduler (sc)
1124+ val coarseGrained = System .getProperty(" spark.mesos.coarse" , " false" ).toBoolean
1125+ val url = mesosUrl.stripPrefix(" mesos://" ) // strip scheme from raw Mesos URLs
1126+ val backend = if (coarseGrained) {
1127+ new CoarseMesosSchedulerBackend (scheduler, sc, url, appName)
1128+ } else {
1129+ new MesosSchedulerBackend (scheduler, sc, url, appName)
1130+ }
1131+ scheduler.initialize(backend)
1132+ scheduler
1133+
1134+ case SIMR_REGEX (simrUrl) =>
1135+ val scheduler = new ClusterScheduler (sc)
1136+ val backend = new SimrSchedulerBackend (scheduler, sc, simrUrl)
1137+ scheduler.initialize(backend)
1138+ scheduler
1139+
1140+ case _ =>
1141+ throw new SparkException (" Could not parse Master URL: '" + master + " '" )
1142+ }
1143+ }
11401144}
11411145
11421146/**
0 commit comments