TaskScheduler

TaskScheduler一、TaskScheduler的初始化在SparkContext中创建。

一、TaskScheduler的初始化

1.1、在SparkContext中创建并启动

        //声明
        private var _taskScheduler: TaskScheduler = _
        private var _schedulerBackend: SchedulerBackend = _

        private[spark] def schedulerBackend: SchedulerBackend = _schedulerBackend
        private[spark] def taskScheduler: TaskScheduler = _taskScheduler
        private[spark] def taskScheduler_=(ts: TaskScheduler): Unit = {
            _taskScheduler = ts
        }

        //创建Shceduler
        // Create and start the scheduler
        //根据部署的模式, 创建对应的SchedulerBackend, TaskSchedulerImpl
        val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
        _schedulerBackend = sched
        _taskScheduler = ts

1.1.1、根据部署的模式, 创建对应的SchedulerBackend, TaskSchedulerImpl

      /**
         * Create a task scheduler based on a given master URL.
         * Return a 2-tuple of the scheduler backend and the task scheduler.
         */
        private def createTaskScheduler(
                sc: SparkContext,
                master: String,
                deployMode: String): (SchedulerBackend, TaskScheduler) = {
            import SparkMasterRegex._

            // When running locally, don't try to re-execute tasks on failure.
            val MAX_LOCAL_TASK_FAILURES = 1

            master match {
                //本地模式
                case "local" =>
                    val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
                    val backend = new LocalSchedulerBackend(sc.getConf, scheduler, 1)
                    scheduler.initialize(backend)
                    (backend, scheduler)

                //设置线程数
                case LOCAL_N_REGEX(threads) =>
                    def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
                    // local[*] estimates the number of cores on the machine; local[N] uses exactly N threads.
                    val threadCount = if (threads == "*") localCpuCount else threads.toInt
                    if (threadCount <= 0) {
                        throw new SparkException(s"Asked to run locally with $threadCount threads")
                    }
                    val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
                    val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)
                    scheduler.initialize(backend)
                    (backend, scheduler)

                case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
                    def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
                    // local[*, M] means the number of cores on the computer with M failures
                    // local[N, M] means exactly N threads with M failures
                    val threadCount = if (threads == "*") localCpuCount else threads.toInt
                    val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true)
                    val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)
                    scheduler.initialize(backend)
                    (backend, scheduler)

                //standalone模式
                case SPARK_REGEX(sparkUrl) =>
                    val scheduler = new TaskSchedulerImpl(sc)
                    val masterUrls = sparkUrl.split(",").map("spark://" + _)
                    val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
                    scheduler.initialize(backend)
                    (backend, scheduler)

                //本地集群模式
                case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
                    // Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.
                    val memoryPerSlaveInt = memoryPerSlave.toInt
                    if (sc.executorMemory > memoryPerSlaveInt) {
                        throw new SparkException(
                            "Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker".format(
                                memoryPerSlaveInt, sc.executorMemory))
                    }

                    val scheduler = new TaskSchedulerImpl(sc)
                    val localCluster = new LocalSparkCluster(
                        numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt, sc.conf)
                    val masterUrls = localCluster.start()
                    val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
                    scheduler.initialize(backend)
                    backend.shutdownCallback = (backend: StandaloneSchedulerBackend) => {
                        localCluster.stop()
                    }
                    (backend, scheduler)

                //根据masterUrl 获取ClusterManager
                case masterUrl =>
                    val cm = getClusterManager(masterUrl) match {
                        case Some(clusterMgr) => clusterMgr
                        case None => throw new SparkException("Could not parse Master URL: '" + master + "'")
                    }
                    try {
                        val scheduler = cm.createTaskScheduler(sc, masterUrl)
                        val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
                        //初始化ClusterManager
                        cm.initialize(scheduler, backend)
                        (backend, scheduler)
                    } catch {
                        case se: SparkException => throw se
                        case NonFatal(e) =>
                            throw new SparkException("External scheduler cannot be instantiated", e)
                    }
            }
        }
       
        private def getClusterManager(url: String): Option[ExternalClusterManager] = {
            val loader = Utils.getContextOrSparkClassLoader
            val serviceLoaders =
                ServiceLoader.load(classOf[ExternalClusterManager], loader).asScala.filter(_.canCreate(url))
            if (serviceLoaders.size > 1) {
                throw new SparkException(
                    s"Multiple external cluster managers registered for the url $url: $serviceLoaders")
            }
            serviceLoaders.headOption
        }

1.2 、TaskSchedulerImpl

1.2.1、调用initialize()对TaskSchedulerImpl进行初始化

  • 创建Pool,Pool中缓存了调度队列, 调度算法, 以及taskSetManager集合等信息
  • 创建SchedulableBuilder, 用户操作Pool中调度队列
        def initialize(backend: SchedulerBackend) {  //持有SchedulerBackend的引用
            this.backend = backend
            // temporarily set rootPool name to empty, 创建一个空的Pool
            rootPool = new Pool("", schedulingMode, 0, 0)
            schedulableBuilder = {
                 //调度策略 fifo, fair
                schedulingMode match {
                    case SchedulingMode.FIFO =>
                        new FIFOSchedulableBuilder(rootPool)
                    case SchedulingMode.FAIR =>
                        new FairSchedulableBuilder(rootPool, conf)
                    case _ =>
                        throw new IllegalArgumentException(s"Unsupported spark.scheduler.mode: $schedulingMode")
                }
            }
            schedulableBuilder.buildPools()
        }

1.3、在SparkContext中启动

        // 启动TaskScheduelr
        // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
        // constructor
        _taskScheduler.start()

1.3.1、实际调用SchedulerBackend的start方法

        override def start() {
            backend.start()

            if (!isLocal && conf.getBoolean("spark.speculation", false)) {
                logInfo("Starting speculative execution thread")
                speculationScheduler.scheduleAtFixedRate(new Runnable {
                    override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
                        checkSpeculatableTasks()
                    }
                }, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
            }
        }

1.3.2、以LocalSchedulerBackend为例

1)对应的初始化代码在前面提到的SparkContext类中的主要流程的createTaskScheduler方法中,构建TaskScheduler实例(这里具体子类为TaskSchedulerImpl)后,在该实例的初始化时传入同时构建的SchedulerBackend实例(这里具体子类为LocalBackend)。

2)构建出TaskScheduler实例后,会调用实例的start方法,在该方法中首先会调用SchedulerBackend的start方法。

3)在SchedulerBackend的start方法中,会构建出一个LocalEndpoint实例,在该实例中就会实例化出一个Executor,Executor实例负责具体的任务执行。

4)之后就是TaskScheduler进行作业调度,调用SchedulerBackend的reviveOffers()方法,然后由该方法向LocalEndpoint实例发送ReviveOffers消息。

5)最终在LocalEndpoint实例处理ReviveOffers消息时,启动Task,其他处理类似

今天的文章TaskScheduler分享到此就结束了,感谢您的阅读。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/4632.html

(0)
编程小号编程小号

相关推荐

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注