但它必须保持分布式数据结构。
不幸的是,你所说的你想要做的事情在 Spark 中是不可能的。如果您愿意将数据集重新分区到单个分区(实际上是将其合并到单个主机上),您可以轻松编写一个函数来做您想做的事情,将增量值保留为一个字段。
由于 Spark 函数在执行时不会在网络上共享状态,因此无法创建您需要保持数据集完全分布的共享状态。
如果您愿意放宽您的要求并允许在一台主机上一次性整合和读取数据,那么您可以通过重新分区到单个分区并应用功能来做您想做的事情。这不会将数据拉到驱动程序上(将其保存在 HDFS/集群中),但仍会在单个执行程序上串行计算输出。例如:
package com.github.nevernaptitsa
import java.io.Serializable
import java.util
import org.apache.spark.sql.{Encoders, SparkSession}
object SparkTest {
class RunningSum extends Function[Int, Tuple2[Int, Int]] with Serializable {
private var runningSum = 0
override def apply(v1: Int): Tuple2[Int, Int] = {
runningSum+=v1
return (v1, runningSum)
}
}
def main(args: Array[String]): Unit ={
val session = SparkSession.builder()
.appName("runningSumTest")
.master("local[*]")
.getOrCreate()
import session.implicits._
session.createDataset(Seq(1,2,3,4,5))
.repartition(1)
.map(new RunningSum)
.show(5)
session.createDataset(Seq(1,2,3,4,5))
.map(new RunningSum)
.show(5)
}
}
这里的两个语句显示不同的输出,第一个提供正确的输出(串行,因为调用了repartition(1)),第二个提供不正确的输出,因为结果是并行计算的。
第一个语句的结果:
+---+---+
| _1| _2|
+---+---+
| 1| 1|
| 2| 3|
| 3| 6|
| 4| 10|
| 5| 15|
+---+---+
第二个语句的结果:
+---+---+
| _1| _2|
+---+---+
| 1| 1|
| 2| 2|
| 3| 3|
| 4| 4|
| 5| 9|
+---+---+