【问题标题】:Spark Streaming: Writing number of rows read from a Kafka topicSpark Streaming:写入从 Kafka 主题读取的行数
【发布时间】:2018-11-10 02:13:32
【问题描述】:

Spark 流式传输作业正在从繁忙的 kafka 主题中读取事件。为了了解每个触发间隔有多少数据进入,我只想输出从主题读取的行数。我尝试了多种方法,但无法弄清楚。

Dataset<Row> stream = sparkSession.readStream()
          .format("kafka")
          .option("kafka.bootstrap.servers", kafkaBootstrapServersString)
          .option("subscribe", topic)
          .option("startingOffsets", "latest")
          .option("enable.auto.commit", false)
//          .option("failOnDataLoss", false)
//          .option("maxOffsetsPerTrigger", 10000)
          .load();
      stream.selectExpr("topic").agg(count("topic")).as("count");
      //stream.selectExpr("topic").groupBy("topic").agg(count(col("topic")).as("count"));
      stream.writeStream()
            .format("console")
            .option("truncate", false)
            .trigger(Trigger.ProcessingTime("10 seconds"))
            .start();

【问题讨论】:

  • stream.selectExpr 返回一个您忽略的新数据集,所以它只是写正在消费的内容
  • 非常感谢。这解决了这个问题。不知怎的,我错过了。

标签: java apache-spark apache-kafka spark-structured-streaming


【解决方案1】:

看来你需要

stream = stream.selectExpr("topic").agg(count("topic")).as("count");

然后你就可以打印出来了

【讨论】:

    猜你喜欢
    • 2019-07-12
    • 2019-06-24
    • 1970-01-01
    • 1970-01-01
    • 2020-07-12
    • 2023-03-25
    • 2020-09-13
    • 1970-01-01
    • 2018-01-09
    相关资源
    最近更新 更多