【问题标题】:Query for streaming dataset in Spark在 Spark 中查询流式数据集
【发布时间】:2020-05-04 17:23:42
【问题描述】:

我有一个 流式传输 数据集,其中包含以下列:bag_id、ball_color。我想为每个包找到最流行的颜色。所以,我尝试了:

dataset.groupBy("bag_id", "color") # 1st aggregation
       .agg(count("color").as("color_count"))
       .groupBy("bag_id") # 2nd aggregation
       .agg(max("color_count"))

但我有一个错误:

线程“主”org.apache.spark.sql.AnalysisException 中的异常: 流式传输不支持多个流式聚合 数据帧/数据集;;

我可以只用一个聚合函数创建正确的查询吗?

【问题讨论】:

  • 你试过我的建议了吗?
  • @ggeop 感谢您的帮助。我用函数 sum 解决了我的问题
  • 完美,好消息 :-) 我也遇到了同样的问题,但我没有这种灵活性来使用一个聚合,所以我使用了foreachBatch() 方法。

标签: apache-spark pyspark apache-spark-sql dataset spark-structured-streaming


【解决方案1】:

是的,Spark 2.4.4(目前最新)尚不支持多流聚合。但是,作为一种解决方法,您可以使用 .foreachBatch() method

def foreach_batch_function(df, epoch_id):
  df.groupBy("bag_id","color")
  .agg(count("color").as("color_count"))
  .groupBy("bag_id").agg(max("color_count"))
  .show() # .show() is a dummy action

streamingDF.writeStream.foreachBatch(foreach_batch_function).start()  

.foreachBatch() 中,df 不是流式df,所以你可以做任何你想做的事情。

【讨论】:

    【解决方案2】:

    有一个开放的 Jira 解决了这个问题 Spark-26655,到目前为止,我们无法对流数据运行多个聚合。

    一种解决方法是执行 one aggregation 并保存回 Kafka..etc 并再次从 kafka 读取以执行另一次聚合。

    (or)

    我们只能对流数据运行一个聚合并将其保存到 HDFS/Hive/HBase 并获取以执行其他聚合(这将是单独的工作)

    【讨论】:

      猜你喜欢
      • 2023-03-22
      • 2021-02-06
      • 2018-08-11
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2014-10-03
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多