【问题标题】:Aggregation with distinct count in Spark structured streaming throwing errorSpark 结构化流式处理中具有不同计数的聚合引发错误
【发布时间】:2020-08-27 01:03:12
【问题描述】:

我正在尝试获取 Spark 结构化流中 Parentgroup、childgroup 和 MountingType 组的唯一 ID。

代码:下面的代码抛出错误

  .withWatermark("timestamp", "1 minutes")
          val aggDF = JSONDF.groupBy("Parentgroup","childgroup","MountingType")
       .agg(countDistinct("id"))

Error:
Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark

请有人帮助我如何在结构化流中聚合和写入 csv。 非常感谢

数据:

{"id":"7CE3A7CA","Faulttime":1544362500,"name":"Sony","Parentgroup":"TV","childgroup":"Other","MountingType":"SurfaceMount"}
    {"id":"7CE3A7CA","Faulttime":1544362509,"name":"Sony","Parentgroup":"TV","childgroup":"Other","MountingType":"SurfaceMount"}
    {"id":"010004FF,"Faulttime":1551339188,"name":"Philips","Parentgroup":"Light","childgroup":"Other","MountingType":"Solder"}
    {"id":"010004FF","Faulttime":1551339188,"name":"Sony","Parentgroup":"TV","childgroup":"Other","MountingType":"Solder"}
    {"id":"010004FF,"Faulttime":1551339191,"name":"Sansui","Parentgroup":"AC","childgroup":"Other","MountingType":"SurfaceMount"}
    {"id":"CE361405","Faulttime":1552159061,"name":"Hyndai","Parentgroup":"SBAR","childgroup":"Other","MountingType":"SurfaceMount"}
    {"id":"CE361405","Faulttime":1552159061,"name":"sony","Parentgroup":"TV","childgroup":"Other","MountingType":"SurfaceMount"}
    {"id":"7BE446C0","Faulttime":1553022095,"name":"Sony","Parentgroup":"TV","childgroup":"Other","MountingType":"Solder"}
    {"id":"7BE446C0","Faulttime":1553022095,"name":"Philips","Parentgroup":"LIGHT","childgroup":"Other","MountingType":"Solder"}

【问题讨论】:

  • 请提供错误详情..
  • @kavetiraviteja:当流式 DataFrames/DataSets 上没有水印的流式聚合时,不支持附加输出模式;;
  • @kavetiraviteja:请帮助我...如果可能的话。
  • 请查看我的评论。

标签: apache-spark spark-streaming


【解决方案1】:

Group By操作需要在Spark Streaming中指定窗口或时间段

试试这个

psuedo code    
val JSONDF = df.withWatermark("timestamp", "1 minutes")
val aggDF = JSONDF.groupBy(window("timestamp", "5 minutes", "1 minutes")).agg(countDistinct("id"),$"Parentgroup",$"childgroup",$"MountingType")

参考: https://databricks.com/blog/2017/05/08/event-time-aggregation-watermarking-apache-sparks-structured-streaming.html

【讨论】:

猜你喜欢
  • 1970-01-01
  • 2018-01-23
  • 2019-01-14
  • 2018-02-23
  • 2019-08-24
  • 1970-01-01
  • 2019-10-03
  • 2018-07-22
  • 1970-01-01
相关资源
最近更新 更多