【发布时间】:2020-10-09 08:18:47
【问题描述】:
我正在尝试使用 pyspark 流基于对其他列的 groupby 操作来获取列的不同值,但我得到了正确的计数。
Function created:
from pyspark.sql.functions import weekofyear,window,approx_count_distinct
def silverToGold(silverPath, goldPath, queryName):
(spark.readStream
.format("delta")
.load(silverPath)
.withColumn("week",weekofyear("eventDate"))
#.groupBy(window(col(("week")).cast("timestamp"),"5 minute")).approx_count_distinct("device_id")
# .withColumn("WAU",col("window.start"))
# .drop("window")
.groupBy("week").agg(approx_distinct.count("device_id").alias("WAU"))
.writeStream
.format("delta")
.option("checkpointLocation",goldPath + "/_checkpoint")
#.option("streamName",queryName)
.queryName(queryName)
.outputMode("complete")
.start(goldPath)
#return queryName
)
Expected Result:
week WAU
1 7
2 4
3 9
4 9
Actual Result:
week WAU
1 7259
2 7427
3 7739
4 7076
示例输入数据:
以文本格式输入数据:
device_id,eventName,client_event_time,eventDate,deviceType 00007d948fbe4d239b45fe59bfbb7e64,scoreAdjustment,2018-06-01T16:55:40.000+0000,2018-06-01,android 00007d948fbe4d239b45fe59bfbb7e64,scoreAdjustment,2018-06-01T16:55:34.000+0000,2018-06-01,android 0000a99151154e4eb14c675e8b42db34,scoreAdjustment,2019-08-18T13:39:36.000+0000,2019-08-18,ios 0000b1e931d947b197385ac1cbb25779,分数调整,2018-07-16T09:13:45.000+0000,2018-07-16,android 0003939e705949e4a184e0a853b6e0af,分数调整,2018-07-17T17:59:05.000+0000,2018-07-17,android 0003e14ca9ba4198b51cec7d2761d391,scoreAdjustment,2018-06-10T09:09:12.000+0000,2018-06-10,ios 00056f7c73c9497180f2e0900a0626e3,scoreAdjustment,2019-07-05T18:31:10.000+0000,2019-07-05,ios 0006ace2d1db46ba94b802d80a43c20f,scoreAdjustment,2018-07-05T14:31:43.000+0000,2018-07-05,ios 000718c45e164fb2b017f146a6b66b7e,分数调整,2019-03-26T08:25:08.000+0000,2019-03-26,android 000807f2ea524bd0b7e27df8d44ab930,purchaseEvent,2019-03-26T22:28:17.000+0000,2019-03-26,android
对此有任何建议
【问题讨论】:
-
您能否将其更改为批量查询并向我们展示要使用的输入数据?你为什么用
approx_distinct.count而不是count? -
因为它是流式传输的,为了获得不同的计数,我使用了 approx_distinct.count。您能否提供任何参考以使用 count() 来获取 device_id 的不同,因为不同的计数不能用于流式传输。
-
在讨论其他方法(例如计数)之前,让我们更多地了解您使用的数据。我仍然认为结果不正确。你如何证明不正确?让我们也这样做。谢谢。
-
这个没试过批量查询。将粘贴内容
-
添加了文本格式的输入数据
标签: apache-spark hadoop pyspark spark-structured-streaming delta-lake