本系列文章源自JerryLead的SparkInternals,本文只是在作者的原文基础上加入自己的理解,批注,和部分源码,作为学习之用
注:原文是基于Spark 1.0.2 , 而本篇笔记是基于spark 2.2.0, 对比后发现核心部分变化不大,依旧值得参考
Broadcast
顾名思义,broadcast 就是将数据从一个节点发送到其他各个节点上去。这样的场景很多,比如 driver 上有一张表,其他节点上运行的 task 需要 lookup 这张表,那么 driver 可以先把这张表 copy 到这些节点,这样 task 就可以在本地查表了。如何实现一个可靠高效的 broadcast 机制是一个有挑战性的问题。先看看 Spark 官网上的一段话:
Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner. Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication cost.
问题:为什么只能 broadcast 只读的变量?
这就涉及一致性的问题,如果变量可以被更新,那么一旦变量被某个节点更新,其他节点要不要一块更新?如果多个节点同时在更新,更新顺序是什么?怎么做同步?还会涉及 fault-tolerance 的问题。为了避免维护数据一致性问题,Spark 目前只支持 broadcast 只读变量。
问题:broadcast 到节点而不是 broadcast 到每个 task?
因为每个 task 是一个线程,而且同在一个进程运行 tasks 都属于同一个 application。因此每个节点(executor)上放一份就可以被所有 task 共享。
问题: 具体怎么用 broadcast?
driver program 例子:
val data = List(1, 2, 3, 4, 5, 6)
val bdata = sc.broadcast(data)
val rdd = sc.parallelize(1 to 6, 2)
val observedSizes = rdd.map(_ => bdata.value.size)
driver 使用 sc.broadcast()
声明要 broadcast 的 data,bdata 的类型是 Broadcast。
当 rdd.transformation(func)
需要用 bdata 时,直接在 func 中调用,比如上面的例子中的 map() 就使用了 bdata.value.size。
问题:怎么实现 broadcast?
broadcast 的实现机制很有意思:
1. 分发 task 的时候先分发 bdata 的元信息
Driver 先建一个本地文件夹用以存放需要 broadcast 的 data,并启动一个可以访问该文件夹的 HttpServer。当调用val bdata = sc.broadcast(data)
时就把 data 写入文件夹,同时写入 driver 自己的 blockManger 中(StorageLevel 为内存+磁盘),获得一个 blockId,类型为 BroadcastBlockId。
//initialize
sparkSession.build()#env.broadcastManager.initialize()
new TorrentBroadcastFactory.initialize()
//use broadcast
sc.broadcast()
broadcastManager.newBroadcast()
//Divide the object into multiple blocks and put those blocks in the block manager.
new TorrentBroadcast[T](value_, nextBroadcastId.getAndIncrement()).writeBlocks()
//保存一份到driver上
SparkEnv.get.blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, tellMaster = false)
doPutIterator()#memoryStore.putIteratorAsValues()#diskStore.put(blockId)
//以4m分别保存block("spark.broadcast.blockSize", "4m"),并得到meta
block MetaDatas = TorrentBroadcast.blockifyObject(value, blockSize..)
foreach block MetaData :
blockManager.putBytes(BroadcastBlockId, MEMORY_AND_DISK_SER...)
doPutBytes()#memoryStore.putIteratorAsValues()#diskStore.putBytes()
//异步复制数据,sc.broadcast()应该只会在driver端保留一份数据,replication=1,后面executorfetch数据时才慢慢增加broadcast的副本数量
if level.replication > 1 :ThreadUtils.awaitReady(replicate(ByteBufferBlockData(bytes, false)...)
//复制副本规则,作为参考
blockManager.replicate()
//请求获得其他BlockManager的id
val initialPeers = getPeers(false)
blockManagerMaster.getPeers(blockManagerId).sortBy(_.hashCode)
//从driver上获取其他节点
driverEndpoint.askSync[Seq[BlockManagerId]](GetPeers(blockManagerId))
//BlockManagerMasterEndpoint中返回非driver和非当前节点的blockManagerId
blockManagerInfo.keySet.contains(blockManagerId)#blockManagerIds.filterNot { _.isDriver }.filterNot { _ == blockManagerId }.toSeq
foreach block replicate replication-1 nodes: blockTransferService.uploadBlockSync()
//后面就是发送信息给blockManager,再保存数据通知driver
blockManager.putBytes()#reportBlockStatus(blockId, putBlockStatus)
blockManagerMasterEndpoint.updateBlockInfo() //driver端更新信息
当调用rdd.transformation(func)
时,如果 func 用到了 bdata,那么 driver submitTask() 的时候会将 bdata 一同 func 进行序列化得到 serialized task,注意序列化的时候不会序列化 bdata 中包含的 data。
//TorrentBroadcast.scala 序列化的时候不会序列化 bdata 中包含的 data
// @transient表明不序列化_value
@transient private lazy val _value: T = readBroadcastBlock()
/** Used by the JVM when serializing this object. */
private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
assertValid()
out.defaultWriteObject()
}
上一章讲到 serialized task 从 driverEndPoint 传递到 executor 时使用 RPC 的传消息机制,消息不能太大,而实际的 data 可能很大,所以这时候还不能 broadcast data。
driver 为什么会同时将 data 放到磁盘和 blockManager 里面?放到磁盘是为了让 HttpServer 访问到,放到 blockManager 是为了让 driver program 自身使用 bdata 时方便(其实我觉得不放到 blockManger 里面也行)。
那么什么时候传送真正的 data?在 executor 反序列化 task 的时候,会同时反序列化 task 中的 bdata 对象,这时候会调用 bdata 的 readObject() 方法。该方法先去本地 blockManager 那里询问 bdata 的 data 在不在 blockManager 里面,如果不在就使用下面的两种 fetch 方式之一去将 data fetch 过来。得到 data 后,将其存放到 blockManager 里面,这样后面运行的 task 如果需要 bdata 就不需要再去 fetch data 了。如果在,就直接拿来用了。
//runjob()
dagScheduler.submitMissingTasks(stage: Stage, jobId: Int)
val taskIdToLocations = getPreferredLocs(stage.rdd, id)-----
getCacheLocs()//从本地或者driver获取缓存rdd位置
rdd.preferredLocations()//也会从checkpointrdd中寻找
var taskBinary: Broadcast[Array[Byte]] = null
try {
// For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
// For ResultTask, serialize and broadcast (rdd, func).
val taskBinaryBytes: Array[Byte] = stage match {
case stage: ShuffleMapStage =>
JavaUtils.bufferToArray(
closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
case stage: ResultStage => //把func也序列化了,func里面包含broadcast变量
//不会序列化 broadcast变量 中包含的 data
JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
}
taskBinary = sc.broadcast(taskBinaryBytes)//广播task
taskScheduler.submitTasks(new TaskSet(...))
...
//TorrentBroadcast.scala
//使用lazy方式,真正反序列化使用_value才调用方法读值
@transient private lazy val _value: T = readBroadcastBlock()
TorrentBroadcast.readBroadcastBlock()
blockManager.getLocalValues()//本地读取
memoryStore.getValues(blockId)#diskStore.getBytes(blockId)
readBlocks() //本地无则从driver/其他executor读取
foreach block :
blockManager.getRemoteBytes(BroadcastBlockId(id, "piece" + pid))
blockManager.putBytes()//保存在本地
//整个broadcast保存在本地
blockManager.putSingle(broadcastId, obj, storageLevel, tellMaster = false)
blocks.foreach(_.dispose()) //去重,把之前分开保存的block删除
下面探讨 broadcast data 时候的两种实现方式:
2. HttpBroadcast
spark 2.2 的Broadcast package中已经去除了HttpBroadcast,只留下了TorrentBroadcast
顾名思义,HttpBroadcast 就是每个 executor 通过的 http 协议连接 driver 并从 driver 那里 fetch data。
Driver 先准备好要 broadcast 的 data,调用sc.broadcast(data)
后会调用工厂方法建立一个 HttpBroadcast 对象。该对象做的第一件事就是将 data 存到 driver 的 blockManager 里面,StorageLevel 为内存+磁盘,blockId 类型为 BroadcastBlockId。
同时 driver 也会将 broadcast 的 data 写到本地磁盘,例如写入后得到 /var/folders/87/grpn1_fn4xq5wdqmxk31v0l00000gp/T/spark-6233b09c-3c72-4a4d-832b-6c0791d0eb9c/broadcast_0
, 这个文件夹作为 HttpServer 的文件目录。
Driver 和 executor 启动的时候,都会生成 broadcastManager 对象,调用 HttpBroadcast.initialize(),driver 会在本地建立一个临时目录用来存放 broadcast 的 data,并启动可以访问该目录的 httpServer。
Fetch data:在 executor 反序列化 task 的时候,会同时反序列化 task 中的 bdata 对象,这时候会调用 bdata 的 readObject() 方法。该方法先去本地 blockManager 那里询问 bdata 的 data 在不在 blockManager 里面,如果不在就使用 http 协议连接 driver 上的 httpServer,将 data fetch 过来。得到 data 后,将其存放到 blockManager 里面,这样后面运行的 task 如果需要 bdata 就不需要再去 fetch data 了。如果在,就直接拿来用了。
HttpBroadcast 最大的问题就是 driver 所在的节点可能会出现网络拥堵,因为 worker 上的 executor 都会去 driver 那里 fetch 数据。
3. TorrentBroadcast
为了解决 HttpBroadast 中 driver 单点网络瓶颈的问题,Spark 又设计了一种 broadcast 的方法称为 TorrentBroadcast,这个类似于大家常用的 BitTorrent 技术。基本思想就是将 data 分块成 data blocks,然后假设有 executor fetch 到了一些 data blocks,那么这个 executor 就可以被当作 data server 了,随着 fetch 的 executor 越来越多,有更多的 data server 加入,data 就很快能传播到全部的 executor 那里去了。
HttpBroadcast 是通过传统的 http 协议和 httpServer 去传 data,在 TorrentBroadcast 里面使用在上一章介绍的 blockManager.getRemoteValues() => NIO ShuffleClient 传数据的方法来传递,读取数据的过程与读取 cached rdd 的方式类似,可以参阅 CacheAndCheckpoint 中的最后一张图。
下面讨论 TorrentBroadcast 的一些细节:
driver 端:
Driver 先把 data 序列化到 byteArray,然后切割成 BLOCK_SIZE(由 spark.broadcast.blockSize = 4MB
设置)大小的 data block,每个 data block 被 TorrentBlock 对象持有。切割完 byteArray 后,会将其回收,因此内存消耗虽然可以达到 2 * Size(data),但这是暂时的。
完成分块切割后,就将分块信息(称为 meta 信息)存放到 driver 自己的 blockManager 里面,StorageLevel 为内存+磁盘,同时会通知 driver 自己的 blockManagerMaster 说 meta 信息已经存放好。通知 blockManagerMaster 这一步很重要,因为 blockManagerMaster 可以被 driver 和所有 executor 访问到,信息被存放到 blockManagerMaster 就变成了全局信息。
之后将每个分块 data block 存放到 driver 的 blockManager 里面,StorageLevel 为内存+磁盘。存放后仍然通知 blockManagerMaster 说 blocks 已经存放好。到这一步,driver 的任务已经完成。
Executor 端:
executor 收到 serialized task 后,先反序列化 task,这时候会反序列化 serialized task 中包含的 bdata 类型是 TorrentBroadcast,也就是去访问 TorrentBroadcast._value,调用其readBroadcastBlock()
方法。这个方法首先得到 bdata 对象,**然后发现 bdata 里面没有包含实际的 data。怎么办?**先询问本地所在的 executor 里的 blockManager 是会否包含 data(通过查询 data 的 broadcastId),包含就直接从本地 blockManager 读取 data。否则,就通过本地 blockManager 去连接 driver 的 blockManagerMaster 获取 data 分块的 meta 信息,获取信息后,就开始了 BT 过程。
**BT 过程:**task 先在本地开一个数组用于存放将要 fetch 过来的 data blocks val blocks = new Array[BlockData](numBlocks)
,然后打乱要 fetch 的 data blocks 的顺序,for (pid <- Random.shuffle(Seq.range(0, numBlocks)))
比如如果 data block 共有 5 个,那么打乱后的 fetch 顺序可能是 3-1-2-4-5。然后按照打乱后的顺序去 fetch 一个个 data block。**每 fetch 到一个 block 就将其存放到 executor 的 blockManager 里面,同时通知 driver 上的 blockManagerMaster 说该 data block 多了一个存储地址。**这一步通知非常重要,意味着 blockManagerMaster 知道 data block 现在在 cluster 中有多份,下一个不同节点上的 task 再去 fetch 这个 data block 的时候,可以有两个选择了,而且会随机选择一个去 fetch。这个过程持续下去就是 BT 协议,随着下载的客户端越来越多,data block 服务器也越来越多,就变成 p2p下载了。关于 BT 协议,Wikipedia 上有一个动画。
整个 fetch 过程结束后,task 会开一个大 Array[Byte],大小为 data 的总大小,然后将 data block 都 copy 到这个 Array,然后对 Array 中 bytes 进行反序列化得到原始的 data,这个过程就是 driver 序列化 data 的反过程。
最后将 data 存放到 task 所在 executor 的 blockManager 里面,StorageLevel 为内存+磁盘。显然,这时候 data 在 blockManager 里存了两份,不过等全部 executor 都 fetch 结束,存储 data blocks 那份可以删掉了。
问题:broadcast RDD 会怎样?
@Andrew-Xia 回答道:不会怎样,就是这个rdd在每个executor中实例化一份。
Discussion
公共数据的 broadcast 是很实用的功能,在 Hadoop 中使用 DistributedCache,比如常用的-libjars
就是使用 DistributedCache 来将 task 依赖的 jars 分发到每个 task 的工作目录。不过分发前 DistributedCache 要先将文件上传到 HDFS。这种方式的主要问题是资源浪费,如果某个节点上要运行来自同一 job 的 4 个 mapper,那么公共数据会在该节点上存在 4 份(每个 task 的工作目录会有一份)。但是通过 HDFS 进行 broadcast 的好处在于单点瓶颈不明显,因为公共 data 首先被分成多个 block,然后不同的 block 存放在不同的节点。这样,只要所有的 task 不是同时去同一个节点 fetch 同一个 block,网络拥塞不会很严重。
对于 Spark 来讲,broadcast 时考虑的不仅是如何将公共 data 分发下去的问题,还要考虑如何让同一节点上的 task 共享 data。
对于第一个问题,Spark 设计了两种 broadcast 的方式,传统存在单点瓶颈问题的 HttpBroadcast,和类似 BT 方式的 TorrentBroadcast。HttpBroadcast 使用传统的 client-server 形式的 HttpServer 来传递真正的 data,而 TorrentBroadcast 使用 blockManager 自带的 NIO 通信方式来传递 data。TorrentBroadcast 存在的问题是慢启动和占内存,慢启动指的是刚开始 data 只在 driver 上有,要等 executors fetch 很多轮 data block 后,data server 才会变得可观,后面的 fetch 速度才会变快。executor 所占内存的在 fetch 完 data blocks 后进行反序列化时需要将近两倍 data size 的内存消耗。不管哪一种方式,driver 在分块时会有两倍 data size 的内存消耗。
对于第二个问题,每个 executor 都包含一个 blockManager 用来管理存放在 executor 里的数据,将公共数据存放在 blockManager 中(StorageLevel 为内存+磁盘),可以保证在 executor 执行的 tasks 能够共享 data。
其实 Spark 之前还尝试了一种称为 TreeBroadcast 的机制,详情可以见技术报告 Performance and Scalability of Broadcast in Spark。
更深入点,broadcast 可以用多播协议来做,不过多播使用 UDP,不是可靠的,仍然需要应用层的设计一些可靠性保障机制。
今天的文章SparkCore-Broadcast-7分享到此就结束了,感谢您的阅读。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/22830.html