【发布时间】:2016-07-25 02:44:25
【问题描述】:
在Spark Streaming中,每批数据间隔总是产生一个且只有一个RDD,为什么我们使用foreachRDD()来foreach RDD? RDD只有一个,不需要foreach。
在我的测试中,我从来没有看到超过一个的 RDD。
【问题讨论】:
-
你到底真的明白答案了吗?
标签: apache-spark spark-streaming
在Spark Streaming中,每批数据间隔总是产生一个且只有一个RDD,为什么我们使用foreachRDD()来foreach RDD? RDD只有一个,不需要foreach。
在我的测试中,我从来没有看到超过一个的 RDD。
【问题讨论】:
标签: apache-spark spark-streaming
DStream 或“离散流”是一种将连续数据流分解为小块的抽象。这称为“微批处理”。每个微批次成为一个 RDD,提供给 Spark 进行进一步处理。 每个批次间隔为每个 DStream 生成一个且只有一个 RDD。
RDD 是数据的分布式集合。将其视为一组指向实际数据在集群中的位置的指针。
DStream.foreachRDD 是 Spark Streaming 中的“输出运算符”。它允许您访问 DStream 的底层 RDD 以执行对数据执行实际操作的操作。例如,使用foreachRDD,您可以将数据写入数据库。
这里的小转折是理解 DStream 是一个有时间限制的集合。让我将其与经典集合进行对比:获取用户列表并对其应用 foreach:
val userList: List[User] = ???
userList.foreach{user => doSomeSideEffect(user)}
这会将副作用函数doSomeSideEffect 应用于userList 集合的每个元素。
现在,假设我们现在不知道所有用户,所以我们无法建立他们的列表。取而代之的是,我们有大量用户,例如在早高峰时来到咖啡店的人:
val userDStream: DStream[User] = ???
userDstream.foreachRDD{usersRDD =>
usersRDD.foreach{user => serveCoffee(user)}
}
注意:
DStream.foreachRDD 为您提供RDD[User],不是单个用户。回到我们的咖啡示例,这是在某个时间间隔内到达的用户集合。rdd.foreach 为每位用户提供咖啡。考虑执行:我们可能有一群咖啡师在煮咖啡。那些是我们的执行者。 Spark Streaming 负责创建小批量用户(或订单),Spark 会将工作分配给咖啡师,以便我们可以并行化咖啡制作并加快咖啡供应。
【讨论】:
foreachRDD() 的 API 来 foreach RDD,因为只有一个 RDD,foreachRDD()可以命名为getRDD() 并返回RDD 对象。你的例子可以这样修改: val userDStream: DStream[User] = ??? userDstream.getRDD.foreach{user => serveCoffee(user)}
foreachRDD() foreach 三个RDD(contains 9:00, 9:15, 9:30),但是假设现在是9:30,9:00和9:15的RDD是之前处理的,如果使用foreachRDD(),则只能处理一个9:30的RDD,9:30之前9:30之前的RDD怎么处理? 9:30 之前的数据或 RDD 会在 9:30 消失,不是吗?在Spark steaming中,每个时间间隔处理当前批次间隔的数据,为什么要在9:30处理6:00的RDD,6:00的RDD应该在6:00处理。
foreachRDD 理解为scheduleOperationOnRddForEachTimeInterval(rdd => operation(rdd))。
foreachRDD),当我们尝试启动它时会抛出异常:No output operations registered, so nothing to execute