
Spark推测执行spark.speculation1.背景hadoop的推测执行  推测执行(SpeculativeExecution)是指在分布式集群环境下,因为程序BUG,负载不均衡或者资源分布不均等原因,造成同一个job的多个task运行速度不一致,有的tas

1. 背景

  推测执行(Speculative Execution)是指在分布式集群环境下,因为程序BUG,负载不均衡或者资源分布不均等原因,造成同一个job的多个task运行速度不一致,有的task运行速度明显慢于其他task(比如:一个job的某个task进度只有10%,而其他所有task已经运行完毕),则这些task拖慢了作业的整体执行进度,为了避免这种情况发生,Hadoop会为该task启动备份任务让该speculative task与原始task同时处理一份数据,哪个先运行完,则将谁的结果作为最终结果。 



2. 注意事项

  1. 谨慎使用,严重的会造成所有资源被全部占用,不能及时释放

3. 源代码

  1. /**

  2. * TaskScheduleImpl的初始化和启动是在SparkConext中,进行的,初始化的时候会

  3. * 传入SparkDeploySchedulerBackend对象。启动则直接调用start方法。在Start

  4. * 方法中,会判断是否启动任务的推测执行,由spark.speculation属性指定,默认不执行

  5. */

  6. if (!isLocal && conf.getBoolean("spark.speculation", false)) {

  7. logInfo("Starting speculative execution thread")

  8. speculationScheduler.scheduleWithFixedDelay(new Runnable {

  9. override def run(): Unit = Utils.tryOrStopSparkContext(sc) {

  10. // 检查我们所有活跃的job中是否有可推测的任务。

  11. checkSpeculatableTasks()

  12. }


  14. }


  1. // How often to check for speculative tasks 多久检查一次推测任务

  2. val SPECULATION_INTERVAL_MS = conf.getTimeAsMs("spark.speculation.interval", "100ms")

  4. // Duplicate copies of a task will only be launched if the original copy has been running for

  5. // at least this amount of time. This is to avoid the overhead of launching speculative copies

  6. // of tasks that are very short.

  7. // 只有在原始副本至少运行了这么多时间的情况下,才会启动任务的副本。这是为了避免产生非常短的任务的推测性副本的开销。


  9. // Check for speculatable tasks in all our active jobs.

  10. // 检查我们所有活跃的job中是否有可推测的任务。

  11. def checkSpeculatableTasks() {

  12. var shouldRevive = false

  13. synchronized {

  14. shouldRevive = rootPool.checkSpeculatableTasks(MIN_TIME_TO_SPECULATION)

  15. }

  16. if (shouldRevive) {

  17. backend.reviveOffers()

  18. }

  19. }


  1. override def checkSpeculatableTasks(): Boolean = {

  2. var shouldRevive = false

  3. for (schedulable <- schedulableQueue.asScala) {

  4. shouldRevive |= schedulable.checkSpeculatableTasks()

  5. }

  6. shouldRevive

  7. }

  直接点开schedulable.checkSpeculatableTasks() 调用的是 
private[spark] trait Schedulable 特质中的接口

  def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean

然后看schedulable <- schedulableQueue.asScala,继续看schedulableQueue的定义,

  1. // 存储(pools或者TaskSetManagers)的链表

  2. val schedulableQueue = new ConcurrentLinkedQueue[Schedulable]



  1. /**

  2. * Check for tasks to be speculated and return true if there are any. This is called periodically

  3. * by the TaskScheduler.

  4. *

  5. */

  6. override def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean = {

  7. // Can't speculate if we only have one task, and no need to speculate if the task set is a

  8. // zombie.

  9. // 如果task只有一个或者所有task都不需要再执行了就没有必要再检测

  10. if (isZombie || numTasks == 1) {

  11. return false

  12. }

  13. var foundTasks = false

  14. // 所有task数 * SPECULATION_QUANTILE(默认0.75,可通过spark.speculation.quantile设置)

  15. val minFinishedForSpeculation = (SPECULATION_QUANTILE * numTasks).floor.toInt

  16. logDebug("Checking for speculative tasks: minFinished = " + minFinishedForSpeculation)

  18. // 成功的task数是否超过总数的75%,并且成功的task是否大于0

  19. if (tasksSuccessful >= minFinishedForSpeculation && tasksSuccessful > 0) {

  20. val time = clock.getTimeMillis()

  21. // 取这多个task任务执行成功时间的中位数

  22. var medianDuration = successfulTaskDurations.median

  23. // 中位数 * SPECULATION_MULTIPLIER (默认1.5,可通过spark.speculation.multiplier设置)

  24. val threshold = max(SPECULATION_MULTIPLIER * medianDuration, minTimeToSpeculation)

  25. // TODO: Threshold should also look at standard deviation of task durations and have a lower

  26. // bound based on that.

  27. logDebug("Task length threshold for speculation: " + threshold)

  29. // 遍历该TaskSet中的task,取未成功执行、正在执行、执行时间已经大于threshold 、

  30. // 推测式执行task列表中未包括的task放进需要推测式执行的列表中speculatableTasks

  31. for (tid <- runningTasksSet) {

  32. val info = taskInfos(tid)

  33. val index = info.index

  34. if (!successful(index) && copiesRunning(index) == 1 && info.timeRunning(time) > threshold &&

  35. !speculatableTasks.contains(index)) {

  36. logInfo(

  37. "Marking task %d in stage %s (on %s) as speculatable because it ran more than %.0f ms"

  38. .format(index,,, threshold))

  39. speculatableTasks += index

  40. foundTasks = true

  41. }

  42. }

  43. }

  44. foundTasks

  45. }

  • 查逻辑代码中注释很明白,当成功的Task数超过总Task数的75%(可通过参数spark.speculation.quantile设置)时,再统计所有成功的Tasks的运行时间,得到一个中位数,用这个中位数乘以1.5(可通过参数spark.speculation.multiplier控制)得到运行时间门限如果在运行的Tasks的运行时间超过这个门限,则对它启用推测简单来说就是对那些拖慢整体进度的Tasks启用推测,以加速整个Stage的运行。


3.1 推测式任务什么时候被调度


  1. /**

  2. * Dequeue a pending task for a given node and return its index and locality level.

  3. * Only search for tasks matching the given locality constraint.

  4. *

  5. * 将给定节点的挂起任务删除,并返回其索引和位置级别。只搜索与给定区域约束匹配的任务。

  6. *

  7. * @return An option containing (task index within the task set, locality, is speculative?)

  8. * 包含(任务集中的任务索引,地点,是推测的?)

  9. */

  10. private def dequeueTask(execId: String, host: String, maxLocality: TaskLocality.Value)

  11. : Option[(Int, TaskLocality.Value, Boolean)] =

  12. {

  13. // dequeueTaskFromList()方法:从给定的列表中取消一个挂起的任务并返回它的索引。如果列表为空,则返回None。

  14. // PROCESS_LOCAL: 数据在同一个 JVM 中,即同一个 executor 上。这是最佳数据 locality。

  15. for (index <- dequeueTaskFromList(execId, host, getPendingTasksForExecutor(execId))) {

  16. return Some((index, TaskLocality.PROCESS_LOCAL, false))

  17. }

  19. // NODE_LOCAL: 数据在同一个节点上。比如数据在同一个节点的另一个 executor上;或在 HDFS 上,

  20. // 恰好有 block 在同一个节点上。速度比 PROCESS_LOCAL 稍慢,因为数据需要在不同进程之间传递或从文件中读取

  21. if (TaskLocality.isAllowed(maxLocality, TaskLocality.NODE_LOCAL)) {

  22. for (index <- dequeueTaskFromList(execId, host, getPendingTasksForHost(host))) {

  23. return Some((index, TaskLocality.NODE_LOCAL, false))

  24. }

  25. }

  27. // NO_PREF: 数据从哪里访问都一样快,不需要位置优先

  28. if (TaskLocality.isAllowed(maxLocality, TaskLocality.NO_PREF)) {

  29. // Look for noPref tasks after NODE_LOCAL for minimize cross-rack traffic

  30. for (index <- dequeueTaskFromList(execId, host, pendingTasksWithNoPrefs)) {

  31. return Some((index, TaskLocality.PROCESS_LOCAL, false))

  32. }

  33. }

  35. // RACK_LOCAL: 数据在同一机架的不同节点上。需要通过网络传输数据及文件 IO,比 NODE_LOCAL 慢

  36. if (TaskLocality.isAllowed(maxLocality, TaskLocality.RACK_LOCAL)) {

  37. for {

  38. rack <- sched.getRackForHost(host)

  39. index <- dequeueTaskFromList(execId, host, getPendingTasksForRack(rack))

  40. } {

  41. return Some((index, TaskLocality.RACK_LOCAL, false))

  42. }

  43. }

  45. // ANY: 数据在非同一机架的网络上,速度最慢

  46. if (TaskLocality.isAllowed(maxLocality, TaskLocality.ANY)) {

  47. for (index <- dequeueTaskFromList(execId, host, allPendingTasks)) {

  48. return Some((index, TaskLocality.ANY, false))

  49. }

  50. }

  52. // find a speculative task if all others tasks have been scheduled

  53. // 如果所有其他任务都安排好了,就去找一个推测的任务。

  54. dequeueSpeculativeTask(execId, host, maxLocality).map {

  55. case (taskIndex, allowedLocality) => (taskIndex, allowedLocality, true)}

  56. }


  1. /**

  2. * Return a speculative task for a given executor if any are available. The task should not have

  3. * an attempt running on this host, in case the host is slow. In addition, the task should meet

  4. * the given locality constraint.

  5. *

  6. * 如果有任何可用的执行器,返回一个推测任务。任务不应该在主机上运行,以防主机运行缓慢。此外,

  7. * 该任务还应满足给定的局部约束条件。

  8. */

  9. // Labeled as protected to allow tests to override providing speculative tasks if necessary

  10. protected def dequeueSpeculativeTask(execId: String, host: String, locality: TaskLocality.Value)

  11. : Option[(Int, TaskLocality.Value)] =

  12. {

  13. // 从推测式执行任务列表中移除已经成功完成的task,因为从检测到调度之间还有一段时间,

  14. // 某些task已经成功执行

  15. // 从set集合中删除完成的任务

  16. speculatableTasks.retain(index => !successful(index)) // Remove finished tasks from set

  19. // 判断task是否可以在该executor对应的Host上执行,判断条件是:

  20. // task没有在该host上运行;

  21. // 该executor没有在task的黑名单里面(task在这个executor上失败过,并还在'黑暗'时间内)

  22. def canRunOnHost(index: Int): Boolean = {

  23. !hasAttemptOnHost(index, host) &&

  24. !isTaskBlacklistedOnExecOrNode(index, execId, host)

  25. }

  27. // // 推测执行任务集合是否为空

  28. if (!speculatableTasks.isEmpty) {

  29. // Check for process-local tasks; note that tasks can be process-local

  30. // on multiple nodes when we replicate cached blocks, as in Spark Streaming

  31. // 获取能在该executor上启动的taskIndex

  32. for (index <- speculatableTasks if canRunOnHost(index)) {

  33. // 获取task的优先位置

  34. val prefs = tasks(index).preferredLocations

  35. val executors = prefs.flatMap(_ match {

  36. case e: ExecutorCacheTaskLocation => Some(e.executorId)

  37. case _ => None

  38. });

  40. // 优先位置若为ExecutorCacheTaskLocation并且数据所在executor包含当前executor,

  41. // 则返回其task在taskSet的index和Locality Levels

  42. if (executors.contains(execId)) {

  43. speculatableTasks -= index

  44. return Some((index, TaskLocality.PROCESS_LOCAL))

  45. }

  46. }

  48. // Check for node-local tasks

  49. // 这里的判断是延迟调度的作用,即使是推测式任务也尽量以最好的本地性级别来启动

  50. if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) {

  51. for (index <- speculatableTasks if canRunOnHost(index)) {

  52. val locations = tasks(index)

  53. if (locations.contains(host)) {

  54. speculatableTasks -= index

  55. return Some((index, TaskLocality.NODE_LOCAL))

  56. }

  57. }

  58. }

  60. // Check for no-preference tasks

  61. if (TaskLocality.isAllowed(locality, TaskLocality.NO_PREF)) {

  62. for (index <- speculatableTasks if canRunOnHost(index)) {

  63. val locations = tasks(index).preferredLocations

  64. if (locations.size == 0) {

  65. speculatableTasks -= index

  66. return Some((index, TaskLocality.PROCESS_LOCAL))

  67. }

  68. }

  69. }

  71. // Check for rack-local tasks

  72. if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) {

  73. for (rack <- sched.getRackForHost(host)) {

  74. for (index <- speculatableTasks if canRunOnHost(index)) {

  75. val racks = tasks(index)

  76. if (racks.contains(rack)) {

  77. speculatableTasks -= index

  78. return Some((index, TaskLocality.RACK_LOCAL))

  79. }

  80. }

  81. }

  82. }

  84. // Check for non-local tasks

  85. if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) {

  86. for (index <- speculatableTasks if canRunOnHost(index)) {

  87. speculatableTasks -= index

  88. return Some((index, TaskLocality.ANY))

  89. }

  90. }

  91. }

  93. None

  94. }


版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。




您的电子邮箱地址不会被公开。 必填项已用*标注