【问题标题】:Structured Streaming Aggregations return wrong values结构化流式聚合返回错误值
【发布时间】:2018-02-23 14:48:37
【问题描述】:

我编写了一个结构化流聚合,它从 Kafka 源获取事件,执行简单的计数并将它们写回 Cassandra 数据库。代码如下所示:

val data = stream
  .groupBy(functions.to_date($"timestamp").as("date"), $"type".as("type"))
  .agg(functions.count("*").as("value"))

val query: StreamingQuery = data
  .writeStream
  .queryName("group-by-type")
  .format("org.apache.spark.sql.streaming.cassandra.CassandraSinkProvider")
  .outputMode(OutputMode.Complete())
  .option("checkpointLocation", config.getString("checkpointLocation") + "/" + "group-by-type")
  .option("keyspace", "analytics")
  .option("table", "aggregations")
  .option("partitionKeyColumns", "project,type")
  .option("clusteringKeyColumns", "date")
  .start()

问题是计数刚刚超过每个批次。所以我会看到 Cassandra 的计数下降。计数不应超过一天,我怎样才能做到这一点?

编辑: 我也尝试过使用窗口聚合,同样的事情

【问题讨论】:

    标签: apache-spark cassandra spark-structured-streaming


    【解决方案1】:

    所以这种情况下的错误实际上不在我的查询或 Spark 中。 为了找出问题出在哪里,我使用了控制台接收器,而那个没有显示问题。

    问题出在我的 Cassandra 水槽中,如下所示:

    class CassandraSink(sqlContext: SQLContext, keyspace: String, table: String) extends Sink {
      override def addBatch(batchId: Long, data: DataFrame): Unit = {
        data.write.mode(SaveMode.Append).cassandraFormat(table, keyspace).save()
      }
    }
    

    它使用 Datastax Spark Cassandra 连接器来写入数据帧。

    问题是变量data 包含一个流数据集。在 Spark 提供的 ConsoleSink 中,DataSet 在写入之前被复制到静态 DataSet 中。所以我改变了它,现在它可以工作了。完成的版本如下所示:

    class CassandraSink(sqlContext: SQLContext, keyspace: String, table: String) extends Sink {
      override def addBatch(batchId: Long, data: DataFrame): Unit = {
        val ds = data.sparkSession.createDataFrame(
          data.sparkSession.sparkContext.parallelize(data.collect()),
          data.schema
        )
        ds.write.mode(SaveMode.Append).cassandraFormat(table, keyspace).save()
      }
    }
    

    【讨论】:

      猜你喜欢
      • 2015-10-03
      • 1970-01-01
      • 2019-07-20
      • 1970-01-01
      • 1970-01-01
      • 2018-09-05
      • 2021-02-03
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多