@@ -153,120 +153,7 @@ class SparkContext(
153153 executorEnvs(" SPARK_USER" ) = sparkUser
154154
155155 // Create and start the scheduler
156- private [spark] var taskScheduler : TaskScheduler = {
157- // Regular expression used for local[N] master format
158- val LOCAL_N_REGEX = """ local\[([0-9]+)\]""" .r
159- // Regular expression for local[N, maxRetries], used in tests with failing tasks
160- val LOCAL_N_FAILURES_REGEX = """ local\[([0-9]+)\s*,\s*([0-9]+)\]""" .r
161- // Regular expression for simulating a Spark cluster of [N, cores, memory] locally
162- val LOCAL_CLUSTER_REGEX = """ local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""" .r
163- // Regular expression for connecting to Spark deploy clusters
164- val SPARK_REGEX = """ spark://(.*)""" .r
165- // Regular expression for connection to Mesos cluster
166- val MESOS_REGEX = """ mesos://(.*)""" .r
167- // Regular expression for connection to Simr cluster
168- val SIMR_REGEX = """ simr://(.*)""" .r
169-
170- master match {
171- case " local" =>
172- new LocalScheduler (1 , 0 , this )
173-
174- case LOCAL_N_REGEX (threads) =>
175- new LocalScheduler (threads.toInt, 0 , this )
176-
177- case LOCAL_N_FAILURES_REGEX (threads, maxFailures) =>
178- new LocalScheduler (threads.toInt, maxFailures.toInt, this )
179-
180- case SPARK_REGEX (sparkUrl) =>
181- val scheduler = new ClusterScheduler (this )
182- val masterUrls = sparkUrl.split(" ," ).map(" spark://" + _)
183- val backend = new SparkDeploySchedulerBackend (scheduler, this , masterUrls, appName)
184- scheduler.initialize(backend)
185- scheduler
186-
187- case SIMR_REGEX (simrUrl) =>
188- val scheduler = new ClusterScheduler (this )
189- val backend = new SimrSchedulerBackend (scheduler, this , simrUrl)
190- scheduler.initialize(backend)
191- scheduler
192-
193- case LOCAL_CLUSTER_REGEX (numSlaves, coresPerSlave, memoryPerSlave) =>
194- // Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.
195- val memoryPerSlaveInt = memoryPerSlave.toInt
196- if (SparkContext .executorMemoryRequested > memoryPerSlaveInt) {
197- throw new SparkException (
198- " Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker" .format(
199- memoryPerSlaveInt, SparkContext .executorMemoryRequested))
200- }
201-
202- val scheduler = new ClusterScheduler (this )
203- val localCluster = new LocalSparkCluster (
204- numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt)
205- val masterUrls = localCluster.start()
206- val backend = new SparkDeploySchedulerBackend (scheduler, this , masterUrls, appName)
207- scheduler.initialize(backend)
208- backend.shutdownCallback = (backend : SparkDeploySchedulerBackend ) => {
209- localCluster.stop()
210- }
211- scheduler
212-
213- case " yarn-standalone" =>
214- val scheduler = try {
215- val clazz = Class .forName(" org.apache.spark.scheduler.cluster.YarnClusterScheduler" )
216- val cons = clazz.getConstructor(classOf [SparkContext ])
217- cons.newInstance(this ).asInstanceOf [ClusterScheduler ]
218- } catch {
219- // TODO: Enumerate the exact reasons why it can fail
220- // But irrespective of it, it means we cannot proceed !
221- case th : Throwable => {
222- throw new SparkException (" YARN mode not available ?" , th)
223- }
224- }
225- val backend = new CoarseGrainedSchedulerBackend (scheduler, this .env.actorSystem)
226- scheduler.initialize(backend)
227- scheduler
228-
229- case " yarn-client" =>
230- val scheduler = try {
231- val clazz = Class .forName(" org.apache.spark.scheduler.cluster.YarnClientClusterScheduler" )
232- val cons = clazz.getConstructor(classOf [SparkContext ])
233- cons.newInstance(this ).asInstanceOf [ClusterScheduler ]
234-
235- } catch {
236- case th : Throwable => {
237- throw new SparkException (" YARN mode not available ?" , th)
238- }
239- }
240-
241- val backend = try {
242- val clazz = Class .forName(" org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend" )
243- val cons = clazz.getConstructor(classOf [ClusterScheduler ], classOf [SparkContext ])
244- cons.newInstance(scheduler, this ).asInstanceOf [CoarseGrainedSchedulerBackend ]
245- } catch {
246- case th : Throwable => {
247- throw new SparkException (" YARN mode not available ?" , th)
248- }
249- }
250-
251- scheduler.initialize(backend)
252- scheduler
253-
254- case MESOS_REGEX (mesosUrl) =>
255- MesosNativeLibrary .load()
256- val scheduler = new ClusterScheduler (this )
257- val coarseGrained = System .getProperty(" spark.mesos.coarse" , " false" ).toBoolean
258- val backend = if (coarseGrained) {
259- new CoarseMesosSchedulerBackend (scheduler, this , mesosUrl, appName)
260- } else {
261- new MesosSchedulerBackend (scheduler, this , mesosUrl, appName)
262- }
263- scheduler.initialize(backend)
264- scheduler
265-
266- case _ =>
267- throw new SparkException (" Could not parse Master URL: '" + master + " '" )
268- }
269- }
156+ private [spark] var taskScheduler = SparkContext .createTaskScheduler(this , master, appName)
270157 taskScheduler.start()
271158
272159 @ volatile private [spark] var dagScheduler = new DAGScheduler (taskScheduler)
@@ -1137,6 +1024,124 @@ object SparkContext {
11371024 .map(Utils .memoryStringToMb)
11381025 .getOrElse(512 )
11391026 }
1027+
1028+ // Creates a task scheduler based on a given master URL. Extracted for testing.
1029+ private
1030+ def createTaskScheduler (sc : SparkContext , master : String , appName : String ): TaskScheduler = {
1031+ // Regular expression used for local[N] master format
1032+ val LOCAL_N_REGEX = """ local\[([0-9]+)\]""" .r
1033+ // Regular expression for local[N, maxRetries], used in tests with failing tasks
1034+ val LOCAL_N_FAILURES_REGEX = """ local\[([0-9]+)\s*,\s*([0-9]+)\]""" .r
1035+ // Regular expression for simulating a Spark cluster of [N, cores, memory] locally
1036+ val LOCAL_CLUSTER_REGEX = """ local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""" .r
1037+ // Regular expression for connecting to Spark deploy clusters
1038+ val SPARK_REGEX = """ spark://(.*)""" .r
1039+ // Regular expression for connection to Mesos cluster by mesos:// or zk:// url
1040+ val MESOS_REGEX = """ (mesos|zk)://.*""" .r
1041+ // Regular expression for connection to Simr cluster
1042+ val SIMR_REGEX = """ simr://(.*)""" .r
1043+
1044+ master match {
1045+ case " local" =>
1046+ new LocalScheduler (1 , 0 , sc)
1047+
1048+ case LOCAL_N_REGEX (threads) =>
1049+ new LocalScheduler (threads.toInt, 0 , sc)
1050+
1051+ case LOCAL_N_FAILURES_REGEX (threads, maxFailures) =>
1052+ new LocalScheduler (threads.toInt, maxFailures.toInt, sc)
1053+
1054+ case SPARK_REGEX (sparkUrl) =>
1055+ val scheduler = new ClusterScheduler (sc)
1056+ val masterUrls = sparkUrl.split(" ," ).map(" spark://" + _)
1057+ val backend = new SparkDeploySchedulerBackend (scheduler, sc, masterUrls, appName)
1058+ scheduler.initialize(backend)
1059+ scheduler
1060+
1061+ case LOCAL_CLUSTER_REGEX (numSlaves, coresPerSlave, memoryPerSlave) =>
1062+ // Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.
1063+ val memoryPerSlaveInt = memoryPerSlave.toInt
1064+ if (SparkContext .executorMemoryRequested > memoryPerSlaveInt) {
1065+ throw new SparkException (
1066+ " Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker" .format(
1067+ memoryPerSlaveInt, SparkContext .executorMemoryRequested))
1068+ }
1069+
1070+ val scheduler = new ClusterScheduler (sc)
1071+ val localCluster = new LocalSparkCluster (
1072+ numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt)
1073+ val masterUrls = localCluster.start()
1074+ val backend = new SparkDeploySchedulerBackend (scheduler, sc, masterUrls, appName)
1075+ scheduler.initialize(backend)
1076+ backend.shutdownCallback = (backend : SparkDeploySchedulerBackend ) => {
1077+ localCluster.stop()
1078+ }
1079+ scheduler
1080+
1081+ case " yarn-standalone" =>
1082+ val scheduler = try {
1083+ val clazz = Class .forName(" org.apache.spark.scheduler.cluster.YarnClusterScheduler" )
1084+ val cons = clazz.getConstructor(classOf [SparkContext ])
1085+ cons.newInstance(sc).asInstanceOf [ClusterScheduler ]
1086+ } catch {
1087+ // TODO: Enumerate the exact reasons why it can fail
1088+ // But irrespective of it, it means we cannot proceed !
1089+ case th : Throwable => {
1090+ throw new SparkException (" YARN mode not available ?" , th)
1091+ }
1092+ }
1093+ val backend = new CoarseGrainedSchedulerBackend (scheduler, sc.env.actorSystem)
1094+ scheduler.initialize(backend)
1095+ scheduler
1096+
1097+ case " yarn-client" =>
1098+ val scheduler = try {
1099+ val clazz = Class .forName(" org.apache.spark.scheduler.cluster.YarnClientClusterScheduler" )
1100+ val cons = clazz.getConstructor(classOf [SparkContext ])
1101+ cons.newInstance(sc).asInstanceOf [ClusterScheduler ]
1102+
1103+ } catch {
1104+ case th : Throwable => {
1105+ throw new SparkException (" YARN mode not available ?" , th)
1106+ }
1107+ }
1108+
1109+ val backend = try {
1110+ val clazz = Class .forName(" org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend" )
1111+ val cons = clazz.getConstructor(classOf [ClusterScheduler ], classOf [SparkContext ])
1112+ cons.newInstance(scheduler, sc).asInstanceOf [CoarseGrainedSchedulerBackend ]
1113+ } catch {
1114+ case th : Throwable => {
1115+ throw new SparkException (" YARN mode not available ?" , th)
1116+ }
1117+ }
1118+
1119+ scheduler.initialize(backend)
1120+ scheduler
1121+
1122+ case mesosUrl @ MESOS_REGEX (_) =>
1123+ MesosNativeLibrary .load()
1124+ val scheduler = new ClusterScheduler (sc)
1125+ val coarseGrained = System .getProperty(" spark.mesos.coarse" , " false" ).toBoolean
1126+ val url = mesosUrl.stripPrefix(" mesos://" ) // strip scheme from raw Mesos URLs
1127+ val backend = if (coarseGrained) {
1128+ new CoarseMesosSchedulerBackend (scheduler, sc, url, appName)
1129+ } else {
1130+ new MesosSchedulerBackend (scheduler, sc, url, appName)
1131+ }
1132+ scheduler.initialize(backend)
1133+ scheduler
1134+
1135+ case SIMR_REGEX (simrUrl) =>
1136+ val scheduler = new ClusterScheduler (sc)
1137+ val backend = new SimrSchedulerBackend (scheduler, sc, simrUrl)
1138+ scheduler.initialize(backend)
1139+ scheduler
1140+
1141+ case _ =>
1142+ throw new SparkException (" Could not parse Master URL: '" + master + " '" )
1143+ }
1144+ }
11401145}
11411146
11421147/**
0 commit comments