【问题标题】:stream difference between column A and B aggregated by column C and DA 列和 B 列之间的流差由 C 列和 D 列汇总
【发布时间】:2020-02-29 11:22:18
【问题描述】:

如何将以下内容流式传输到表格中:

C 列和 D 列汇总的 A 列和 B 列之间的差异。

+-------------+-------------------+--+-
| Column_A|Column_B |Column_C|Column_D|
+-------------+-------------------+--+-
|52       |67       |boy     |car     |
|44       |25       |girl    |bike    |
|98       |85       |boy     |car     |
|52       |41       |girl    |car     |
+-------------+-------------------+--+-

这是我的尝试,但它不起作用:

difference = streamingDataF.withColumn("Difference", expr("Column_A - Column_B")).drop("Column_A").drop("Column_B").groupBy("Column_C")
differenceStream = difference.writeStream\
  .queryName("diff_aggr")\
  .format("memory").outputMode("append")\
  .start()

我收到此错误:“GroupedData”对象没有属性“writeStream”

【问题讨论】:

    标签: python apache-spark pyspark spark-streaming


    【解决方案1】:

    取决于您希望如何聚合分组数据 - 例如,您可以这样做

    先决条件(以防您尚未设置):

    from pyspark.sql import functions as F 
    from pyspark.sql.functions import *
    

    对于sum

    difference = streamingDataF.withColumn("Difference", expr("Column_A - Column_B")).drop("Column_A").drop("Column_B").groupBy("Column_C").agg(F.sum(F.col("Difference")).alias("Difference"))
    

    对于max

    difference = streamingDataF.withColumn("Difference", expr("Column_A - Column_B")).drop("Column_A").drop("Column_B").groupBy("Column_C").agg(F.max(F.col("Difference")).alias("Difference"))
    

    然后:

    differenceStream = difference.writeStream\
      .queryName("diff_aggr")\
      .format("memory").outputMode("append")\
      .start()
    

    关键是 - 如果你这样做 groupBy 你还需要通过聚合来减少。如果您想将值排序在一起,请尝试df.sort(...)

    【讨论】:

    • 如何将 C 列和 D 列聚合在一起?
    • 你的意思是:difference = streamingDataF.withColumn("Difference", expr("Column_A - Column_B")).drop("Column_A").drop("Column_B").groupBy("Column_C", "Column_D").agg(F.max(F.col("Difference")).alias("Difference_max"), F.min(F.col("Difference")).alias("Difference_min"))?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-09-02
    • 2018-09-14
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多