【发布时间】:2018-11-10 02:13:32
【问题描述】:
Spark 流式传输作业正在从繁忙的 kafka 主题中读取事件。为了了解每个触发间隔有多少数据进入,我只想输出从主题读取的行数。我尝试了多种方法,但无法弄清楚。
Dataset<Row> stream = sparkSession.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", kafkaBootstrapServersString)
.option("subscribe", topic)
.option("startingOffsets", "latest")
.option("enable.auto.commit", false)
// .option("failOnDataLoss", false)
// .option("maxOffsetsPerTrigger", 10000)
.load();
stream.selectExpr("topic").agg(count("topic")).as("count");
//stream.selectExpr("topic").groupBy("topic").agg(count(col("topic")).as("count"));
stream.writeStream()
.format("console")
.option("truncate", false)
.trigger(Trigger.ProcessingTime("10 seconds"))
.start();
【问题讨论】:
-
stream.selectExpr返回一个您忽略的新数据集,所以它只是写正在消费的内容 -
非常感谢。这解决了这个问题。不知怎的,我错过了。
标签: java apache-spark apache-kafka spark-structured-streaming