【问题标题】:How to know if RDDs in a spark streaming window has finished the spark job?如何知道火花流窗口中的 RDD 是否已完成火花作业?
【发布时间】:2024-05-19 21:15:01
【问题描述】:

我正在 Spark Streaming 应用程序中读取来自 Kafka 的消息。

SparkBatch 持续时间:15 秒。 SparkWindow:60 秒。

var dstream = KafkaUtils.createDirectStream() // ignore the argumtns
var windowedStream = dstream.window(SparkWindow)
// delete data from REDIS
windowedStream.foreachRDD(rdd -> {
        if(!rdd.isEmpty()) {
                JavaFutureAction<Void> v = rdd.foreachPartitionAsync(t -> {
                // collect error data across partitions and write those to REDIS
        })// foreachPartitionAsync ends
    }
})
// fetchFromREDISAndProcess() --Once foreachRDD ends. fetch error data from REDIS and process them

有一个限制,我必须首先在 spark 窗口中从每个分区和 RDD 收集错误记录,然后在驱动程序上处理它们。

我将在每个 Spark 窗口中获得 4 个 RDD。

问题: 我想在每个窗口之后从 REDIS 读取数据并在继续下一个窗口之前处理它们。有没有办法确保我每次火花窗口结束时都执行我的代码?

【问题讨论】:

标签: apache-spark spark-streaming kafka-consumer-api


【解决方案1】:

您可以为此使用以下逻辑:::

var dstream = KafkaUtils.createDirectStream() // ignore the argumtns
var windowedStream = dstream.window(SparkWindow)
// delete data from REDIS
var partitions = 4;
var currentPart = 0;

windowedStream.foreachRDD(rdd -> {
        if(!rdd.isEmpty()) {
                JavaFutureAction<Void> v = rdd.foreachPartitionAsync(t -> {
                // collect error data across partitions and write those to REDIS
        })// foreachPartitionAsync ends

        if(++currentPart % partitions == 0)    //It will be true at every 4th RDD where window will end.
            //Read data from REDIS and process here as after this new window will start.
    }
})
// fetchFromREDISAndProcess() --Once foreachRDD ends. fetch error data from REDIS and process them

【讨论】:

    最近更新 更多