【问题标题】:How can I count the average per Key or Grouping of records from Spark Streaming DStream?如何计算 Spark Streaming DStream 中每个键或记录分组的平均值?
【发布时间】:2018-09-09 22:17:59
【问题描述】:

我对 Spark Scala 有疑问,我想从 dstream 数据中计算平均值,我像这样从 kafka 获取数据到 dstream,

[(2,110),(2,130),(2,120),(3,200),(3,206),(3,206),(4,150),(4,160),(4,170)]

我想这样数他们,

[(2,(110+130+120)/3),(3,(200+206+206)/3),(4,(150+160+170)/3)]

那么,得到这样的结果,

[(2,120),(3,204),(4,160)]

如何使用 dstream 中的 scala 执行此操作?我使用火花版本 1.6

【问题讨论】:

    标签: apache-spark spark-streaming


    【解决方案1】:

    使用map转换输入(x, y) -

    [(2,110),(2,130),(2,120),(3,200),(3,206),(3,206),(4,150),(4,160),(4,170)]

    到 (x, (y, 1)

    [(2,(110, 1)),(2,(130, 1)),(2,(120, 1)),(3,(200, 1)),(3,(206, 1)),(3,(206, 1)),(4,(150, 1)),(4,(160, 1),(4,(170, 1))]

    现在,通过编写一个 reduce 函数来使用 redudceByKeyAndWindow,它将添加两条记录: (x, (y1, 1)) 和 (x,(y2, 1)) 到 (x, (y1+y2, 1+1)

    [(2,(360, 3)),(3,(612, 3)),(4,(480, 3))]

    现在再次运行地图以获取平均值 - (x, (y1, y2)) 到 (x, (y1/y2))

    [(2,120),(3,204),(4,160)]

    【讨论】:

    • 谢谢。我这样使用 val agg_rdd = rdd.aggregateByKey((0,0))((acc, value) => (acc._1 + value, acc._2 + 1), (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2)) val sum = agg_rdd.mapValues(x => (x._1/x._2)) sum.collect跨度>
    猜你喜欢
    • 2015-11-03
    • 1970-01-01
    • 2016-06-17
    • 1970-01-01
    • 1970-01-01
    • 2017-02-20
    • 2016-03-28
    • 2018-02-15
    • 2021-11-01
    相关资源
    最近更新 更多