【问题标题】:Spark Streaming Sliding Window max and minSpark Streaming 滑动窗口最大值和最小值
【发布时间】:2017-04-03 23:09:44
【问题描述】:

我是 Spark 的初学者;我正在研究火花流用例,其中我收到一条 json 消息,每条 json 消息都有一个属性“值”,在解析 json 后它是双倍的我得到一个数组 [双]。我想找出最大值(值)和最小值(值) 持续 15 秒,滑动窗口为 2 秒。 这是我的代码。

val record = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicMap, StorageLevel.MEMORY_ONLY_SER_2)
val lines=record.map(_._2)

val valueDtsream:DStream[Array[Double]]=lines.map { jsonRecord => parseJson(jsonRecord) }
                            .window(Seconds(15),Seconds(2))

      valueDtsream.foreachRDD
      {
         rdd => 
           if (!rdd.partitions.isEmpty)
           {
               //code to find min and max
           }
      }

ssc.start()
ssc.awaitTermination()

【问题讨论】:

    标签: json scala hadoop apache-spark spark-streaming


    【解决方案1】:

    试试:

    valueDtsream.transform( rdd => {
      val stats = rdd.flatMap(x => x).stats
      rdd.sparkContext.parallelize(Seq((stats.min, stats.max)))
    })
    

    【讨论】:

    • 谢谢你的回答,对不起这个愚蠢的问题为什么我需要转换并再次并行化。下面的代码不足以每 15 秒在控制台上打印最大值、最小值。 valueDtsream.foreachRDD { rdd => if (!rdd.partitions.isEmpty) { val stats = rdd.flatMap(x => x) println(stats.min, stats.max) } }
    • 如果您只想打印,您可以使用foreachRDD 并删除其余部分。 parallelize 因为transformRDD => RDD
    • 谢谢,我接受了这个问题的答案我还有 1 个问题我尝试了代码(没有转换和并行化),但我的 spark 流式作业是每 2 秒打印一次最小值,最大值理想情况下它应该只在窗口 15 秒后打印最小值,最大值,但这没有发生
    • 您有 15 秒的窗口,步长为 2 秒,因此这是意料之中的。你想要 15 秒的步数吗?
    • 是的,我需要 15 秒窗口
    猜你喜欢
    • 2012-05-30
    • 2012-06-03
    • 1970-01-01
    • 1970-01-01
    • 2020-06-21
    • 2019-07-09
    • 1970-01-01
    • 2016-08-12
    • 2019-10-21
    相关资源
    最近更新 更多