展示。提示:
只要你有重量级的初始化,就应该做一次
对于许多 RDD 元素,而不是每个 RDD 元素一次,如果这样
初始化,例如从第三方创建对象
库,无法序列化(以便 Spark 可以将其传输
集群到工作节点),使用mapPartitions() 而不是
map()。 mapPartitions() 提供要完成的初始化
每个工作任务/线程/分区一次,而不是每个RDD 数据一次
example : 的元素见下文。
val newRd = myRdd.mapPartitions(partition => {
val connection = new DbConnection /*creates a db connection per partition*/
val newPartition = partition.map(record => {
readMatchingFromDB(record, connection)
}).toList // consumes the iterator, thus calls readMatchingFromDB
connection.close() // close dbconnection here
newPartition.iterator // create a new iterator
})
第二季度。 flatMap 的行为是像 map 还是像 mapPartitions?
是的。请参阅flatmap.. 的示例 2。它不言自明。
第一季度。 RDD 的 map 和 mapPartitions 有什么区别
map 在每个元素级别运行正在使用的函数,而
mapPartitions 在分区级别执行函数。
示例场景:如果我们在特定的 RDD 分区中有 100K 元素,那么我们将在使用时触发映射转换使用的函数 100K 次map。
相反,如果我们使用mapPartitions,那么我们只会调用特定函数一次,但我们会传入所有 100K 记录并在一次函数调用中取回所有响应。
由于map 在特定函数上工作了很多次,所以性能会有所提升,特别是如果函数每次都在做一些昂贵的事情,如果我们传入所有元素就不需要做一次(如果是mappartitions)。
地图
对 RDD 的每一项应用一个转换函数并返回
结果作为一个新的 RDD。
列出变体
def map[U: ClassTag](f: T => U): RDD[U]
例子:
val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
val b = a.map(_.length)
val c = a.zip(b)
c.collect
res0: Array[(String, Int)] = Array((dog,3), (salmon,6), (salmon,6), (rat,3), (elephant,8))
地图分区
这是一个专门的映射,每个分区只调用一次。
各个分区的全部内容可作为
通过输入参数 (Iterarator[T]) 的顺序值流。
自定义函数必须返回另一个 Iterator[U]。结合的
结果迭代器会自动转换为新的 RDD。请
请注意,以下元组 (3,4) 和 (6,7) 缺失
结果是由于我们选择的分区。
preservesPartitioning 表示输入函数是否保留
partitioner,应该是false,除非这是一对 RDD 和输入
函数不会修改键。
列出变体
def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]
示例 1
val a = sc.parallelize(1 to 9, 3)
def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = {
var res = List[(T, T)]()
var pre = iter.next
while (iter.hasNext)
{
val cur = iter.next;
res .::= (pre, cur)
pre = cur;
}
res.iterator
}
a.mapPartitions(myfunc).collect
res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))
示例 2
val x = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9,10), 3)
def myfunc(iter: Iterator[Int]) : Iterator[Int] = {
var res = List[Int]()
while (iter.hasNext) {
val cur = iter.next;
res = res ::: List.fill(scala.util.Random.nextInt(10))(cur)
}
res.iterator
}
x.mapPartitions(myfunc).collect
// some of the number are not outputted at all. This is because the random number generated for it is zero.
res8: Array[Int] = Array(1, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 5, 7, 7, 7, 9, 9, 10)
上面的程序也可以用flatMap写成如下。
使用平面图的示例 2
val x = sc.parallelize(1 to 10, 3)
x.flatMap(List.fill(scala.util.Random.nextInt(10))(_)).collect
res1: Array[Int] = Array(1, 2, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 6, 6, 6, 6, 6, 6, 6, 6, 7, 7, 7, 8, 8, 8, 8, 8, 8, 8, 8, 9, 9, 9, 9, 9, 10, 10, 10, 10, 10, 10, 10, 10)
结论:
mapPartitions 转换比map 更快,因为它调用你的函数一次/分区,而不是一次/元素..
延伸阅读:foreach Vs foreachPartitions When to use What?