Spark 源码系列(五)分布式缓存

Spark 源码系列(五)分布式缓存这一章想讲一下 Spark 的缓存是如何实现的。这个 persist 方法是在 RDD 里面的,所以我们直接打开 RDD 这个类。 它调用 SparkContext 去缓存这个 RDD,追杀下去。 它居然是用一个 HashMap 来存的,具体看这个 map 的类型是 TimeS…

这一章想讲一下 Spark 的缓存是如何实现的。这个 persist 方法是在 RDD 里面的,所以我们直接打开 RDD 这个类。

  def persist(newLevel: StorageLevel): this.type = {
    // StorageLevel不能随意更改
    if (storageLevel != StorageLevel.NONE && newLevel != storageLevel) {
      throw new UnsupportedOperationException("Cannot change storage level of an RDD after it was already assigned a level")
    }
    sc.persistRDD(this)
    // Register the RDD with the ContextCleaner for automatic GC-based cleanup
    // 注册清理方法
    sc.cleaner.foreach(_.registerRDDForCleanup(this))
    storageLevel = newLevel
    this
  }

它调用 SparkContext 去缓存这个 RDD,追杀下去。

  private[spark] def persistRDD(rdd: RDD[_]) {
    persistentRdds(rdd.id) = rdd
  }

它居然是用一个 HashMap 来存的,具体看这个 map 的类型是 TimeStampedWeakValueHashMap[Int, RDD[_]] 类型。把存进去的值都隐式转换成 WeakReference,然后加到一个内部的一个 ConcurrentHashMap 里面。这里貌似也没干啥,这是有个鸟蛋用。。大神莫喷,知道干啥用的人希望告诉我一下。

CacheManager

现在并没有保存,等到真正运行 Task 运行的时候才会去缓存起来。入口在 Task 的 runTask 方法里面,具体的我们可以看 ResultTask,它调用了 RDD 的 iterator 方法。

  final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
    if (storageLevel != StorageLevel.NONE) {
      SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
    } else {
      computeOrReadCheckpoint(split, context)
    }
  }

一旦设置了 StorageLevel,就要从 SparkEnv 的 cacheManager 取数据。

  def getOrCompute[T](rdd: RDD[T], split: Partition, context: TaskContext, storageLevel: StorageLevel): Iterator[T] = {
    val key = RDDBlockId(rdd.id, split.index)
    blockManager.get(key) match {
      case Some(values) =>
        // 已经有了,直接返回就可以了
        new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]])

      case None =>
        // loading包含这个key表示已经有人在加载了,等到loading被释放了,就可以去blockManager里面取到了
        loading.synchronized {
          if (loading.contains(key)) {
            while (loading.contains(key)) {
              try {
                loading.wait()
              } catch {
                case e: Exception =>
                  logWarning(s"Got an exception while waiting for another thread to load $key", e)
              }
            }
            // 别人成功拿到了,我们直接取结果就是了,如果别人取失败了,我们再来取一次
            blockManager.get(key) match {
              case Some(values) =>
                return new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]])
              case None =>
                loading.add(key)
            }
          } else {
            loading.add(key)
          }
        }
        try {
          // 通过rdd自身的compute方法去计算得到结果,回去看看RDD那文章,自己看看源码就清楚了
          val computedValues = rdd.computeOrReadCheckpoint(split, context)

          // 如果是本地运行的,就没必要缓存了,直接返回即可
          if (context.runningLocally) {
            return computedValues
          }

          // 跟踪blocks的更新状态
          var updatedBlocks = Seq[(BlockId, BlockStatus)]()
          val returnValue: Iterator[T] = {
            if (storageLevel.useDisk && !storageLevel.useMemory) {
              /* 这是RDD采用DISK_ONLY的情况,直接扔给blockManager * 然后把结果直接返回,它不需要把结果一下子全部加载进内存 * 这同样适用于MEMORY_ONLY_SER,但是我们需要在启用它之前确认blocks没被block store给丢弃 */
              updatedBlocks = blockManager.put(key, computedValues, storageLevel, tellMaster = true)
              blockManager.get(key) match {
                case Some(values) =>
                  values.asInstanceOf[Iterator[T]]
                case None =>
                  throw new Exception("Block manager failed to return persisted valued")
              }
            } else {
              // 先存到一个ArrayBuffer,然后一次返回,在blockManager里也存一份
              val elements = new ArrayBuffer[Any]
              elements ++= computedValues
              updatedBlocks = blockManager.put(key, elements, storageLevel, tellMaster = true)
              elements.iterator.asInstanceOf[Iterator[T]]
            }
          }

          // 更新task的监控参数
          val metrics = context.taskMetrics
          metrics.updatedBlocks = Some(updatedBlocks)

          new InterruptibleIterator(context, returnValue)

        } finally {
          // 改完了,释放锁
          loading.synchronized {
            loading.remove(key)
            loading.notifyAll()
          }
        }
    }
  }

1、如果 blockManager 当中有,直接从 blockManager 当中取。

2、如果 blockManager 没有,就先用 RDD 的 compute 函数得到出来一个 Iterable 接口。

3、如果 StorageLevel 是只保存在硬盘的话,就把值存在 blockManager 当中,然后从 blockManager 当中取出一个 Iterable 接口,这样的好处是不会一次把数据全部加载进内存。

4、如果 StorageLevel 是需要使用内存的情况,就把结果添加到一个 ArrayBuffer 当中一次返回,另外在 blockManager 存上一份,下次直接从 blockManager 取。

对 StorageLevel 说明一下吧,贴一下它的源码。

class StorageLevel private( private var useDisk_ : Boolean, private var useMemory_ : Boolean, private var useOffHeap_ : Boolean, private var deserialized_ : Boolean, private var replication_ : Int = 1)

  val NONE = new StorageLevel(false, false, false, false)
  val DISK_ONLY = new StorageLevel(true, false, false, false)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  val OFF_HEAP = new StorageLevel(false, false, true, false)

大家注意看它那几个参数,useDisk_、useMemory_、useOffHeap_、deserialized_、replication_ 在具体的类型的时候是传的什么值。

下面我们的目标要放到 blockManager。

BlockManager

BlockManager 这个类比较大,我们从两方面开始看吧,putBytes 和 get 方法。先从 putBytes 说起,之前说过 Task 运行结束之后,结果超过 10M 的话,会用 BlockManager 缓存起来。

env.blockManager.putBytes(blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)

putBytes 内部又掉了另外一个方法 doPut,方法很大呀,先折叠起来。

  private def doPut(
      blockId: BlockId,
      data: Values,
      level: StorageLevel,
      tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = {// Return value
    val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]

    // 记录它的StorageLevel,以便我们可以在它加载进内存之后,可以按需写入硬盘。
  // 此外,在我们把调用BlockInfo的markReay方法之前,都没法通过get方法获得该部分内容
    val putBlockInfo = {
      val tinfo = new BlockInfo(level, tellMaster)
      // 如果不存在,就添加到blockInfo里面
      val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo)
      if (oldBlockOpt.isDefined) {
        // 如果已经存在了,就不需要重复添加了
        if (oldBlockOpt.get.waitForReady()) {return updatedBlocks
        }
        // 存在于blockInfo当中->但是上一次保存失败了,拿出旧的信息,再试一遍
        oldBlockOpt.get
      } else {
        tinfo
      }
    }

    val startTimeMs = System.currentTimeMillis
    // 当我们需要存储数据,并且要复制数据到别的机器,我们需要访问它的值,但是因为我们的put操作会读取整个iterator,
    // 这就不会有任何的值留下。在我们保存序列化的数据的场景,我们可以记住这些bytes,但在其他场景,比如反序列化存储的
    // 时候,我们就必须依赖返回一个Iterator
    var valuesAfterPut: Iterator[Any] = null
    // Ditto for the bytes after the put
    var bytesAfterPut: ByteBuffer = null
    // Size of the block in bytes
    var size = 0L

    // 在保存数据之前,我们要实例化,在数据已经序列化并且准备好发送的情况下,这个过程是很快的
    val replicationFuture = if (data.isInstanceOf[ByteBufferValues] && level.replication > 1) {
      // duplicate并不是复制这些数据,只是做了一个包装
      val bufferView = data.asInstanceOf[ByteBufferValues].buffer.duplicate()
      Future {
        // 把block复制到别的机器上去
        replicate(blockId, bufferView, level)
      }
    } else {
      null
    }

    putBlockInfo.synchronized {

      var marked = false
      try {
        if (level.useMemory) {
          // 首先是保存到内存里面,尽管它也使用硬盘,等内存不够的时候,才会写入硬盘
          // 下面分了三种情况,但是Task的结果是ByteBufferValues这种情况,具体看putBytes方法
          val res = data match {
            case IteratorValues(iterator) =>
              memoryStore.putValues(blockId, iterator, level, true)
            case ArrayBufferValues(array) =>
              memoryStore.putValues(blockId, array, level, true)
            case ByteBufferValues(bytes) =>
              bytes.rewind()
              memoryStore.putBytes(blockId, bytes, level)
          }
          size = res.size
          // 这里写得那么恶心,是跟data的类型有关系的,data: Either[Iterator[_], ByteBuffer],Left是Iterator,Right是ByteBuffer
          res.data match {
            case Right(newBytes) => bytesAfterPut = newBytes
            case Left(newIterator) => valuesAfterPut = newIterator
          }
          // 把被置换到硬盘的blocks记录到updatedBlocks上
          res.droppedBlocks.foreach { block => updatedBlocks += block }
        } else if (level.useOffHeap) {
          // 保存到Tachyon上.
          val res = data match {
            case IteratorValues(iterator) =>
              tachyonStore.putValues(blockId, iterator, level, false)
            case ArrayBufferValues(array) =>
              tachyonStore.putValues(blockId, array, level, false)
            case ByteBufferValues(bytes) =>
              bytes.rewind()
              tachyonStore.putBytes(blockId, bytes, level)
          }
          size = res.size
          res.data match {
            case Right(newBytes) => bytesAfterPut = newBytes
            case _ =>
          }
        } else {
          // 直接保存到硬盘,不要复制到其它节点的就别返回数据了.
          val askForBytes = level.replication > 1
          val res = data match {
            case IteratorValues(iterator) =>
              diskStore.putValues(blockId, iterator, level, askForBytes)
            case ArrayBufferValues(array) =>
              diskStore.putValues(blockId, array, level, askForBytes)
            case ByteBufferValues(bytes) =>
              bytes.rewind()
              diskStore.putBytes(blockId, bytes, level)
          }
          size = res.size
          res.data match {
            case Right(newBytes) => bytesAfterPut = newBytes
            case _ =>
          }
        }
     // 通过blockId获得当前的block状态
        val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
        if (putBlockStatus.storageLevel != StorageLevel.NONE) {
          // 成功了,把该block标记为ready,通知BlockManagerMaster
          marked = true
          putBlockInfo.markReady(size)
          if (tellMaster) {
            reportBlockStatus(blockId, putBlockInfo, putBlockStatus)
          }
          updatedBlocks += ((blockId, putBlockStatus))
        }
      } finally {
        // 如果没有标记成功,就把该block信息清除
       if (!marked) {
          blockInfo.remove(blockId)
          putBlockInfo.markFailure()
        }
      }
    }

    // 把数据发送到别的节点做备份
    if (level.replication > 1) {
      data match {
        case ByteBufferValues(bytes) => Await.ready(replicationFuture, Duration.Inf)
        case _ => {
          val remoteStartTime = System.currentTimeMillis
          // 把Iterator里面的数据序列化之后,发送到别的节点
          if (bytesAfterPut == null) {
            if (valuesAfterPut == null) {
              throw new SparkException("Underlying put returned neither an Iterator nor bytes! This shouldn't happen.")
            }
            bytesAfterPut = dataSerialize(blockId, valuesAfterPut)
          }
          replicate(blockId, bytesAfterPut, level)
        }
      }
    }
    // 销毁bytesAfterPut
    BlockManager.dispose(bytesAfterPut)
    updatedBlocks
  }

从上面的的来看:

1、存储的时候按照不同的存储级别分了 3 种情况来处理:存在内存当中(包括 MEMORY 字样的),存在 tachyon 上(OFF_HEAP),只存在硬盘上(DISK_ONLY)。

2、存储完成之后会根据存储级别决定是否发送到别的节点,在名字上最后带 2 字的都是这种,2 表示一个 block 会在两个节点上保存。

3、存储完毕之后,会向 BlockManagerMaster 汇报 block 的情况。

4、这里面的序列化其实是先压缩后序列化,默认使用的是 LZF 压缩,可以通过 spark.io.compression.codec 设定为 snappy 或者 lzo,序列化方式通过 spark.serializer 设置,默认是 JavaSerializer。

img

接下来我们再看 get 的情况。

    val local = getLocal(blockId)
    if (local.isDefined) return local
    val remote = getRemote(blockId)
    if (remote.isDefined) return remote
    None

先从本地取,本地没有再去别的节点取,都没有,返回 None。从本地取就不说了,怎么进怎么出。讲一下怎么从别的节点去,它们是一个什么样子的关系?

我们先看 getRemote 方法

  private def doGetRemote(blockId: BlockId, asValues: Boolean): Option[Any] = {
    val locations = Random.shuffle(master.getLocations(blockId))
    for (loc <- locations) {
      val data = BlockManagerWorker.syncGetBlock(GetBlock(blockId), ConnectionManagerId(loc.host, loc.port))
      if (data != null) {
        if (asValues) {
          return Some(dataDeserialize(blockId, data))
        } else {
          return Some(data)
        }
      }
    }
    None
  }

这个方法包括两个步骤:

1、用 blockId 通过 master 的 getLocations 方法找到它的位置。

2、通过 BlockManagerWorker.syncGetBlock 到指定的节点获取数据。

ok,下面就重点讲 BlockManager 和 BlockManagerMaster 之间的关系,以及 BlockManager 之间是如何相互传输数据。

BlockManager 与 BlockManagerMaster 的关系

BlockManager 我们使用的时候是从 SparkEnv.get 获得的,我们观察了一下 SparkEnv,发现它包含了我们运行时候常用的那些东东。那它创建是怎么创建的呢,我们找到 SparkEnv 里面的 create 方法,右键 FindUsages,就会找到两个地方调用了,一个是 SparkContext,另一个是 Executor。在 SparkEnv 的 create 方法里面会实例化一个 BlockManager 和 BlockManagerMaster。这里我们需要注意看 BlockManagerMaster 的实例化方法,里面调用了 registerOrLookup 方法。

    def registerOrLookup(name: String, newActor: => Actor): ActorRef = {
      if (isDriver) {
        actorSystem.actorOf(Props(newActor), name = name)
      } else {
        val driverHost: String = conf.get("spark.driver.host", "localhost")
        val driverPort: Int = conf.getInt("spark.driver.port", 7077)
        Utils.checkHost(driverHost, "Expected hostname")
        val url = s"akka.tcp://spark@$driverHost:$driverPort/user/$name"
        val timeout = AkkaUtils.lookupTimeout(conf)
        Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)
      }
    }

所以从这里可以看出来,除了 Driver 之后的 actor 都是,都是持有的 Driver 的引用 ActorRef。梳理一下,我们可以得出以下结论:

1、SparkContext 持有一个 BlockManager 和 BlockManagerMaster。

2、每一个 Executor 都持有一个 BlockManager 和 BlockManagerMaster。

3、Executor 和 SparkContext 的 BlockManagerMaster 通过 BlockManagerMasterActor 来通信。

接下来,我们看看 BlockManagerMasterActor 里的三组映射关系。

  // 1、BlockManagerId和BlockManagerInfo的映射关系
  private val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo]
  // 2、Executor ID 和 Block manager ID的映射关系
  private val blockManagerIdByExecutor = new mutable.HashMap[String, BlockManagerId]
  // 3、BlockId和保存它的BlockManagerId的映射关系
  private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]]

看到这三组关系,前面的 getLocations 方法不用看它的实现,我们都应该知道是怎么找了。

BlockManager 相互传输数据

BlockManager 之间发送数据和接受数据是通过 BlockManagerWorker 的 syncPutBlock 和 syncGetBlock 方法来实现。看 BlockManagerWorker 的注释,说是 BlockManager 的网络接口,采用的是事件驱动模型。

再仔细看这两个方法,它传输的数据包装成 BlockMessage 之后,通过 ConnectionManager 的 sendMessageReliablySync 方法来传输。

接下来的故事就是 nio 之间的发送和接收了,就简单说几点吧:

1、ConnectionManager 内部实例化一个 selectorThread 线程来接收消息,具体请看 run 方法。

2、Connection 发送数据的时候,是一次把消息队列的 message 全部发送,不是一个一个 message 发送,具体看 SendConnection 的 write 方法,与之对应的接收看 ReceivingConnection 的 read 方法。

3、read 完了之后,调用回调函数 ConnectionManager 的 receiveMessage 方法,它又调用了 handleMessage 方法,handleMessage 又调用了 BlockManagerWorker 的 onBlockMessageReceive 方法。传说中的事件驱动又出现了。

  def processBlockMessage(blockMessage: BlockMessage): Option[BlockMessage] = {
    blockMessage.getType match {
      case BlockMessage.TYPE_PUT_BLOCK => {
        val pB = PutBlock(blockMessage.getId, blockMessage.getData, blockMessage.getLevel)
        putBlock(pB.id, pB.data, pB.level)
        None
      }
      case BlockMessage.TYPE_GET_BLOCK => {
        val gB = new GetBlock(blockMessage.getId)
        val buffer = getBlock(gB.id)
        Some(BlockMessage.fromGotBlock(GotBlock(gB.id, buffer)))
      }
      case _ => None
    }
  }

根据 BlockMessage 的类型进行处理,put 类型就保存数据,get 类型就从本地把 block 读出来返回给它。

img

注:BlockManagerMasterActor 是存在于 BlockManagerMaster 内部,画在外面只是因为它在通信的时候起了关键的作用的,Executor 上持有的 BlockManagerMasterActor 均是 Driver 节点的 Actor 的引用。

广播变量

先回顾一下怎么使用广播变量:

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: spark.Broadcast[Array[Int]] = spark.Broadcast(b5c40191-a864-4c7d-b9bf-d87e1a4e787c)

scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)

看了一下实现调用的是 broadcastFactory 的 newBroadcast 方法。

  def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean) = {
    broadcastFactory.newBroadcast[T](value_, isLocal, nextBroadcastId.getAndIncrement())
  }

默认的 broadcastFactory 是 HttpBroadcastFactory,内部还有另外一个实现 TorrentBroadcastFactory,先说 HttpBroadcastFactory 的 newBroadcast 方法。

它直接 new 了一个 HttpBroadcast。

  HttpBroadcast.synchronized {
    SparkEnv.get.blockManager.putSingle(blockId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
  }

  if (!isLocal) {
    HttpBroadcast.write(id, value_)
  }

它的内部做了两个操作,把数据保存到 driver 端的 BlockManager 并且写入到硬盘。

TorrentBroadcast 和 HttpBroadcast 都把数据存进了 BlockManager 做备份,但是 TorrentBroadcast 接着并没有把数据写入文件,而是采用了下面这种方式:

  def sendBroadcast() {
    // 把数据给切分了,每4M一个分片
    val tInfo = TorrentBroadcast.blockifyObject(value_)
    totalBlocks = tInfo.totalBlocks
    totalBytes = tInfo.totalBytes
    hasBlocks = tInfo.totalBlocks

    // 把分片的信息存到BlockManager,并通知Master
    val metaId = BroadcastBlockId(id, "meta")
    val metaInfo = TorrentInfo(null, totalBlocks, totalBytes)
    TorrentBroadcast.synchronized {
      SparkEnv.get.blockManager.putSingle(
        metaId, metaInfo, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
    }

    // 遍历所有分片,存到BlockManager上面,并通知Master
    for (i <- 0 until totalBlocks) {
      val pieceId = BroadcastBlockId(id, "piece" + i)
      TorrentBroadcast.synchronized {
        SparkEnv.get.blockManager.putSingle(
          pieceId, tInfo.arrayOfBlocks(i), StorageLevel.MEMORY_AND_DISK, tellMaster = true)
      }
    }
  }

1、把数据序列化之后,每 4M 切分一下。

2、切分完了之后,把所有分片写入 BlockManager。

但是找不到它们是怎么传播的??只是写入到 BlockManager,但是 tellMaster 为 false 的话,就相当于存在本地了,别的 BlockManager 是没法获取到的。

这时候我注意到它内部有两个方法,readObject 和 writeObject,会不会和这两个方法有关呢?它们做的操作就是给 value 赋值。

为了检验这个想法,我亲自调试了一下,在反序列化任务的时候,readObject 这个方法是被 ObjectInputStream 调用了。这块的知识大家可以百度下 ObjectInputStream 和 ObjectOutputStream。

具体操作如下:

1、打开 BroadcastSuite 这个类,找到下面这段代码,图中的地方原来是 512, 被我改成 256 了,之前一直运行不起来。

  test("Accessing TorrentBroadcast variables in a local cluster") {
    val numSlaves = 4
    sc = new SparkContext("local-cluster[%d, 1, 256]".format(numSlaves), "test", torrentConf)
    val list = List[Int](1, 2, 3, 4)
    val broadcast = sc.broadcast(list)
    val results = sc.parallelize(1 to numSlaves).map(x => (x, broadcast.value.sum))
    assert(results.collect().toSet === (1 to numSlaves).map(x => (x, 10)).toSet)
  }

2、找到 TorrentBroadcast,在 readObject 方法上打上断点。

3、开始调试吧。

之前讲过,Task 是被序列化之后包装在消息里面发送给 Worker 去运行的,所以在运行之前必须把 Task 进行反序列化,具体在 TaskRunner 的 run 方法里面:

task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)

Ok,告诉大家入口了,剩下的大家去尝试吧。前面介绍了怎么切分的,到 TorrentBroadcast 的 readObject 里面就很容易理解了。

1、先通过 MetaId 从 BlockManager 里面取出来 Meta 信息。

2、通过 Meta 信息,构造分片 id,去 BlockManager 里面取。

3、获得分片之后,把分片写入到本地的 BlockManager 当中。

4、全部取完之后,通过下面的方法反向赋值。

if (receiveBroadcast()) {
     value_ = TorrentBroadcast.unBlockifyObject[T](arrayOfBlocks, totalBytes, totalBlocks)
&emsp;&emsp; SparkEnv.get.blockManager.putSingle(broadcastId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
}

5、把 value_又顺手写入到 BlockManager 当中。(这里相当于写了两份进去,大家要注意了哈,内存消耗还是大大地。幸好是 MEMORY_AND_DISK 的)

这么做是有好处的,这是一种类似 BT 的做法,把数据切分成一小块一小块,容易传播,从不同的机器上获取一小块一小块的数据,最后组装成完整的。

把完整的 value 写入 BlockManager 是为了使用的时候方便,不需要再次组装。

相关参数

// BlockManager的最大内存
spark.storage.memoryFraction 默认值0.6
// 文件保存的位置
spark.local.dir 默认是系统变量java.io.tmpdir的值
// tachyon保存的地址
spark.tachyonStore.url 默认值tachyon://localhost:19998
// 默认不启用netty来传输shuffle的数据
spark.shuffle.use.netty 默认值是false
spark.shuffle.sender.port 默认值是0
// 一个reduce抓取map中间结果的最大的同时抓取数量大小(to avoid over-allocating memory for receiving shuffle outputs)
spark.reducer.maxMbInFlight 默认值是48*1024*1024
// TorrentBroadcast切分数据块的分片大小
spark.broadcast.blockSize 默认是4096
// 广播变量的工厂类
spark.broadcast.factory 默认是org.apache.spark.broadcast.HttpBroadcastFactory,也可以设置为org.apache.spark.broadcast.TorrentBroadcastFactory
// 压缩格式
spark.io.compression.codec 默认是LZF,可以设置成Snappy或者Lzo

今天的文章Spark 源码系列(五)分布式缓存分享到此就结束了,感谢您的阅读。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:http://bianchenghao.cn/22794.html

(0)
编程小号编程小号

相关推荐

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注