Spark – 一文搞懂 Partitioner

Spark – 一文搞懂 Partitionerspark 处理 RDD 时提供了 foreachPartition 和 mapPartition 的方法对 partition 进行处理。

​ ​本文已参与「新人创作礼」活动,一起开启掘金创作之路。

一.引言

spark 处理 RDD 时提供了 foreachPartition 和 mapPartition 的方法对 partition 进行处理,一个 partition 内可能包含一个文件或者多个文件的内容,Partitioner 可以基于 pairRDD 的 key 实现自定义 partition 的内容。

Spark - 一文搞懂 Partitioner

Spark - 一文搞懂 Partitioner

Partitioner 函数最基本的两个方法是 numPartitions 和 getPartition(key: Any):

A.numPartitions: 获取总的分区数

B.getPartition:

根据 key 获取当前 partition 对应的分区数目,范围在 [0, numPartitions-1],这里的 partitionId 与 TaskContext.getPartitionId 的值一致,通过 hash(key) 得到 int 的 partitionNum 变量,相同 partitonNum 的 key 对应的 paidRDD 将分到同一个 partition 内处理

常见的 Partition 分区类型有如下几种:

分区函数 分区方法
HashPartitioner 根据 hash(key) 分区
RangePartitioner 根据 Range 边界分区
Partitioner 根据自定义规则分区

二.HashPartitioner

1.源码分析

hashPartitioner 基于 Object.hashcode % partitionNum 进行分区,需要注意 partitionNum 的值是需要 >= 0 的,partiionNum 的获取通过 getPartition 函数内的 nonNegativeMod 函数实现

Spark - 一文搞懂 Partitioner

Spark - 一文搞懂 Partitioner

nonNegativeMod 在实现 hashCode % partitionNum 的基础上增加了非负性的要求,因为 partitionNum 是大于等于 0 的数目:

Spark - 一文搞懂 Partitioner

Spark - 一文搞懂 Partitioner

2.代码测试

    val testRdd = sc.parallelize((0 until 500).toArray.zipWithIndex)
    val partitionNum = 5

    testRdd.partitionBy(new HashPartitioner(partitionNum)).foreachPartition(partition => {
      if (partition.nonEmpty) {
        val info = partition.toArray.map(_._1)
        val taskId = TaskContext.getPartitionId()
        info.take(3).map(num => println(s"taskId: $taskId num: $num hashNum: ${num.hashCode()} %$partitionNum=${num.hashCode() % 5}"))
      }
    })

Spark - 一文搞懂 Partitioner

这里将 0-499 共 500 个数字 zipWithIndex 生成 pairRdd 并通过 HashPartitioner 生成 5 个 Partition,通过 TaskContext 获取 partitionId,为了日志一一打印,这里采用 local[1] 的配置 : 

val conf = new SparkConf().setAppName("PartitionTest").setMaster("local[1]")

Spark - 一文搞懂 Partitioner

可以看到红框内同一个 TaskId 对应的 partition 内的 key 都具有相同的 mod 值,所以被分到同一分区。 

Spark - 一文搞懂 Partitioner

Spark - 一文搞懂 Partitioner

3.repartition

正常使用的 repartition 函数采用 HashPartitioner 函数作为默认分区函数,下面尝试一下:

    println("=============================repartition=============================")
    testRdd.repartition(5).foreachPartition(partition => {
      if (partition.nonEmpty) {
        val info = partition.toArray.map(_._1)
        val taskId = TaskContext.getPartitionId()
        info.take(3).map(num => println(s"taskId: $taskId num: $num hashNum: ${num.hashCode()} %$partitionNum=${num.hashCode() % 5}"))
      }
    })

Spark - 一文搞懂 Partitioner

与上面不同的是 taskId 有区别,但是相同 mod 的 key 仍然会分到同一分区:

Spark - 一文搞懂 Partitioner

Spark - 一文搞懂 Partitioner

三.RangePartitioner

1.源码分析

RangePartitioner 根据范围将元素大致均匀的分配至不同分区 partition,范围通过传入 RDD 的内容采样来确定。 

Spark - 一文搞懂 Partitioner

Spark - 一文搞懂 Partitioner

除了 partitions 的参数外,RangePartitioner 还需要将待分区的 rdd 传入供随机采样生成 rangeBounds 使用,相比于 HashPartition 直接 hashCodes % partitionNum 的操作,RangePartitioner 分区共分两步:

A.获取分区 Boundary

需要采样的分区样本大小上线为 1m,转换为 double 避免精度溢出,第一个 else 逻辑内考虑如果一个分区内包含的项目数远远超过平均数,则从中重新采样,以确保该分区能够收集到足够的采样数目,最下面的 if 函数使用所需的采样概率对不平衡分区重新采样,最终得到分区的边界,这里可以抽空单独拎出来研究一下。举个例子大致理解下,假如所有 partition 内的 key 的范围是 0-500,随机生成5个分区,则生成 101-203-299-405 这样的区间,每一个数字代表其分区的上界,例如分区0的上界为 101,分区1的上界为 203,依次类推,最终生成 5 个分区。

Spark - 一文搞懂 Partitioner

Spark - 一文搞懂 Partitioner

B.根据Boundary获取分区

如果分区数组长度不大于 128,则进行简单的暴力循环搜索,如果超过 128,则进行二分查找,同时提供根据 ascending 参数决定 partitionId 的顺序或逆序。这与之前 Spark-Scala 数据特征分桶时采用的优化策略一致,有兴趣可以看看:Scala – 数值型特征分桶

Spark - 一文搞懂 Partitioner

Spark - 一文搞懂 Partitioner

2.代码测试

    val testRdd = sc.parallelize((0 until 500).toArray.zipWithIndex)

    testRdd.partitionBy(new RangePartitioner(5, testRdd)).foreachPartition(partition => {
      if (partition.nonEmpty) {
        val info = partition.toArray.map(_._1)
        val taskId = TaskContext.getPartitionId()
        info.take(3).map(num => println(s"taskId: $taskId num: $num hashNum: ${num.hashCode()} length: ${info.length}"))
      }
    })

Spark - 一文搞懂 Partitioner

依然使用 500 个纯数字 RDD 进行 range 分区的测试,为了验证大致均分的思想,这里最后不再打印 mod 结果,转而打印每个 partition 内元素的数量,可以看到这次每组数量不像之前 HashPartitioner 得到的一样均匀,而是介于 500/5=100 的上下,但是总数为 500。

Spark - 一文搞懂 Partitioner

Spark - 一文搞懂 Partitioner

四.SelfPartitioner

1.源码分析

自定义 Partitioner 主要实现下述两个功能,上面也提到了,再简单补充下:

numPartitions: 获取总的分区数

getPartition: 获取 key 对应的分区 id

Spark - 一文搞懂 Partitioner

Spark - 一文搞懂 Partitioner

2.代码实现

A.SelfPartitioner

基于上面 RangePartitioner 分区不均匀的情况,我们采用 SelfParitioner 自定义分区的方式实现均匀分区,这里偷懒直接生成了对应的上界 boundary,实际场景中 boundary 应该基于 partitionNum 的数量动态生成,getPartition 函数内引入了 break 机制,不熟悉的同学可以移步:Scala – 优雅的break,随后就是基础的暴力循环,如果找到上界则返回上界对应的 index 作为分区 id。

    import scala.util.control.Breaks._
    class SelfPartition(partitionNum: Int) extends Partitioner {

      val boundary: Array[(Int, Int)] = Array(100, 200, 300, 400, 500).zipWithIndex

      override def numPartitions: Int = partitionNum

      override def getPartition(key: Any): Int = {
        val keyNum = key.toString.toInt
        var partitionNum = 0
        breakable(
          boundary.foreach(bound => {
            if (keyNum < bound._1) {
              partitionNum = bound._2
              break()
            }
          })
        )
        partitionNum
      }
    }

Spark - 一文搞懂 Partitioner

B.运行结果

    val testRdd = sc.parallelize((0 until 500).toArray.zipWithIndex)

    testRdd.partitionBy(new SelfPartition(5)).foreachPartition(partition => {
      if (partition.nonEmpty) {
        val info = partition.toArray.map(_._1)
        val taskId = TaskContext.getPartitionId()
        info.take(3).map(num => println(s"taskId: $taskId num: $num hashNum: ${num.hashCode()} length: ${info.length}"))
      }
    })

Spark - 一文搞懂 Partitioner

通过 SelfPartitioner 分区后可以看到 0-499 共 500 个元素被均匀分配到 5 个 Partition 内,除了最简单的 boundary 方法分区外,也可以自定义 hash 方法,key 的类型默认为 Any,如果 key 不是 scala 的基本数据类型,则使用 key.asInstanceOf[T] 转换即可。 

Spark - 一文搞懂 Partitioner

Spark - 一文搞懂 Partitioner

五.repartitionAndSortWithinPartitions

1.函数分析

Spark - 一文搞懂 PartitionerSpark - 一文搞懂 Partitioner

除了正常的分区需求外,spark 还提供 repartitionAndSortWithinPartitions 函数,该函数根据给定的分区器 Partitioner 进行分区划分得到新的 RDD,并根据每个键进行排序,使得 RDD 中的数据保持一定顺序,该方法比 repartition + sorting 更加高效,因为它把排序机制放入了 shuffle 的过程中。 

Spark - 一文搞懂 Partitioner

Spark - 一文搞懂 Partitioner

源码中该方法位于 OrderedRddFunctions 类内,只支持传入分区函数 Partitioner,ordering 排序规则需要使用 implict 传入隐函数的方法定义:

Spark - 一文搞懂 Partitioner

Spark - 一文搞懂 Partitioner

对于需要分区的 key: Any,需要定义隐函数保证其实现 Ordering 接口才能实现分区后排序,否则只能分区没有排序。 

2.代码实现

A.分区排序依据

在分区函数的基础上,增加了 Ordering 隐函数,这里 Partitioner 函数仍然负责根据 key 得到分区 Id,和上面不同的时,分区的同时对 rdd 进行 shuffle,其中 order 的规则由隐函数给出,这里通过比较二者的分数进行排序,如果想要逆序只需要添加负号即可 -(x.score – y.score)。

  // 学生类
  case class Student(name: String, score: Int)

  // 隐函数-Ordering
  implicit object StudentOrdering extends Ordering[Student] {
    override def compare(x: Student, y: Student): Int = {
      x.score - y.score
    }
  }

  class SelfSortPartition(partitionNum: Int) extends Partitioner {

    val boundary: Array[(Int, Int)] = Array(100, 200, 300, 400, 500).zipWithIndex

    override def numPartitions: Int = partitionNum

    override def getPartition(key: Any): Int = {
      val stuKey = key.asInstanceOf[Student]
      val keyNum = stuKey.name.toInt
      var partitionNum = 0
      breakable(
        boundary.foreach(bound => {
          if (keyNum < bound._1) {
            partitionNum = bound._2
            break()
          }
        })
      )
      partitionNum
    }
  }

Spark - 一文搞懂 Partitioner

B.主函数

这里使用 0-499 的 String 类型作为学生的编号,Score 则采取 math.random x 100 进行模拟,分区使用 Student 的 name id,所以每个元素的分区不变,变的是每个元素的顺序。

    println("=============================SortPartition=============================")
    val studentRdd = sc.parallelize((0 until 500).toArray.map(num => (Student(num.toString, (math.random * 100).toInt), true)))
    studentRdd.take(5).foreach(println(_))

    studentRdd.repartitionAndSortWithinPartitions(new SelfSortPartition(5)).foreachPartition(partition => {
      if (partition.nonEmpty) {
        val taskId = TaskContext.getPartitionId()
        println("===========================")
        partition.toArray.take(5).map(stu => {
          println(s"TaskId: ${taskId} Name: ${stu._1.name} Score: ${stu._1.score}")
        })
      }
    }

Spark - 一文搞懂 Partitioner

由于使用 x.score – y.score 顺序计数,所以按分数从小到大排序:

Spark - 一文搞懂 Partitioner

Spark - 一文搞懂 Partitioner

这一届是带过最差的学生,咋还能考0分。

C.其他写法

除了 StudentOrdering 的写法,也可以采用直接 Object Student 的写法,这里 A <: Student 表示任何 A 的子类都支持该隐式调用,关于 <: 相关知识可以参考:Scala Generic 泛型类详解 – T

  object Student {
    implicit def orderingByGradeStudentScore[A <: Student]: Ordering[A] = {
      Ordering.by(stu => stu.score)
    }
  }


  implicit object StudentOrdering extends Ordering[Student] {
    override def compare(x: Student, y: Student): Int = {
      x.score - y.score
    }
  }

Spark - 一文搞懂 Partitioner

如果想要支持多重排序,可以在元祖内增加多个字段,会优先比较 name 再比较 score,以此类推,同理如果想要逆序,例如根据分数逆序排列,则改为 (stu.name,-1 * stu.score)

  object Student {
    implicit def orderingByGradeStudentScore[A <: Student]: Ordering[A] = {
      Ordering.by(stu => (stu.name, stu.score))
    }
  }

Spark - 一文搞懂 Partitioner

如果对应的分区 key 没有实现 implict 的比较隐函数,则函数会直接报灰,无法编译:

Spark - 一文搞懂 Partitioner

Spark - 一文搞懂 Partitioner

六.总结

Partitioner 的一般用法大致就这些,除了三种 HashPartitioner 函数外,Spark 也可以通过 repartitionAndSortWithinPartitions 实现分区 + 排序的需求,总体来说,Partitioner 支持用户自定义分区规则去规划任务的 task 需要处理什么样的 partition 数据,对于精细化处理和区域化定制的需求十分方便,除此之外,一些需要顺序处理的数据或者顺序存储的数据,通过 SortWithinPartitions 的方法也可以提高效率,非常奈斯👍。最后继续感叹命名的抽象性,partition –  分片、隔墙,现在看到屏风就像看到了 RDD。

Spark - 一文搞懂 Partitioner

Spark - 一文搞懂 Partitioner

今天的文章Spark – 一文搞懂 Partitioner分享到此就结束了,感谢您的阅读。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/15526.html

(0)
编程小号编程小号

相关推荐

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注