【发布时间】:2018-02-23 14:48:37
【问题描述】:
我编写了一个结构化流聚合,它从 Kafka 源获取事件,执行简单的计数并将它们写回 Cassandra 数据库。代码如下所示:
val data = stream
.groupBy(functions.to_date($"timestamp").as("date"), $"type".as("type"))
.agg(functions.count("*").as("value"))
val query: StreamingQuery = data
.writeStream
.queryName("group-by-type")
.format("org.apache.spark.sql.streaming.cassandra.CassandraSinkProvider")
.outputMode(OutputMode.Complete())
.option("checkpointLocation", config.getString("checkpointLocation") + "/" + "group-by-type")
.option("keyspace", "analytics")
.option("table", "aggregations")
.option("partitionKeyColumns", "project,type")
.option("clusteringKeyColumns", "date")
.start()
问题是计数刚刚超过每个批次。所以我会看到 Cassandra 的计数下降。计数不应超过一天,我怎样才能做到这一点?
编辑: 我也尝试过使用窗口聚合,同样的事情
【问题讨论】:
标签: apache-spark cassandra spark-structured-streaming