【问题标题】:How to count items per time window?如何计算每个时间窗口的项目?
【发布时间】:2018-10-28 20:04:04
【问题描述】:

我正在尝试使用 Spark 结构化流来计算每个时间窗口中来自 Kafka 的项目数,代码如下:

import java.text.SimpleDateFormat
import java.util.Date
import org.apache.spark.sql.ForeachWriter
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.window

object Counter extends App {
  val dateFormatter = new SimpleDateFormat("HH:mm:ss")
  val spark = ...
  import spark.implicits._

  val df = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", ...)
    .option("subscribe", ...)
    .load()

  val windowDuration = "5 minutes"
  val counts = df
    .select("value").as[Array[Byte]]
    .map(decodeTimestampFromKafka).toDF("timestamp")
    .select($"timestamp" cast "timestamp")
    .withWatermark("timestamp", windowDuration)
    .groupBy(window($"timestamp", windowDuration, "1 minute"))
    .count()
    .as[((Long, Long), Long)]

  val writer = new ForeachWriter[((Long, Long), Long)] {
    var partitionId: Long = _
    var version: Long = _

    def open(partitionId: Long, version: Long): Boolean = {
      this.partitionId = partitionId
      this.version = version
      true
    }

    def process(record: ((Long, Long), Long)): Unit = {
      val ((start, end), docs) = record
      val startDate = dateFormatter.format(new Date(start))
      val endDate = dateFormatter.format(new Date(end))
      val now = dateFormatter.format(new Date)
      println(s"$now:$this|$partitionId|$version: ($startDate, $endDate) $docs")
    }

    def close(errorOrNull: Throwable): Unit = {}
  }

  val query = counts
    .repartition(1)
    .writeStream
    .outputMode("complete")
    .foreach(writer)
    .start()

  query.awaitTermination()

  def decodeTimestampFromKafka(bytes: Array[Byte]): Long = ...
}

我预计,每分钟一次(幻灯片持续时间),它将输出一条记录(因为唯一的聚合键是窗口),其中包含最后 5 分钟(窗口持续时间)的项目计数。 但是,它每分钟输出几条记录 2-3 次,就像在这个示例中一样:

...
22:44:34|Counter$$anon$1@6eb68dd7|0|8: (22:43:20, 22:43:20) 383
22:44:34|Counter$$anon$1@6eb68dd7|0|8: (22:43:18, 22:43:19) 435
22:44:34|Counter$$anon$1@6eb68dd7|0|8: (22:42:33, 22:42:34) 395
22:44:34|Counter$$anon$1@6eb68dd7|0|8: (22:43:14, 22:43:14) 435
22:44:34|Counter$$anon$1@6eb68dd7|0|8: (22:43:09, 22:43:09) 437
22:44:34|Counter$$anon$1@6eb68dd7|0|8: (22:43:19, 22:43:19) 411
22:44:34|Counter$$anon$1@6eb68dd7|0|8: (22:43:07, 22:43:07) 400
22:44:34|Counter$$anon$1@6eb68dd7|0|8: (22:43:17, 22:43:17) 392
22:44:44|Counter$$anon$1@5b70120f|0|9: (22:43:37, 22:43:38) 420
22:44:44|Counter$$anon$1@5b70120f|0|9: (22:43:25, 22:43:25) 395
22:44:44|Counter$$anon$1@5b70120f|0|9: (22:43:22, 22:43:22) 416
22:44:44|Counter$$anon$1@5b70120f|0|9: (22:43:00, 22:43:00) 438
22:44:44|Counter$$anon$1@5b70120f|0|9: (22:43:41, 22:43:41) 426
22:44:44|Counter$$anon$1@5b70120f|0|9: (22:44:13, 22:44:13) 132
22:44:44|Counter$$anon$1@5b70120f|0|9: (22:44:02, 22:44:02) 128
22:44:44|Counter$$anon$1@5b70120f|0|9: (22:44:09, 22:44:09) 120
...

将输出模式更改为append 似乎会改变行为,但仍远未达到我的预期。

我对它应该如何工作的假设有什么问题?鉴于上面的代码,应该如何解释或使用示例输出?

【问题讨论】:

    标签: scala apache-spark spark-structured-streaming


    【解决方案1】:

    您允许计算最多 5 分钟的延迟事件并更新已计算的窗口(withWatermark),请参阅 Spark 指南中的 handling late data and watermarking

    【讨论】:

      猜你喜欢
      • 2023-01-11
      • 2015-12-26
      • 2021-04-30
      • 2020-12-27
      • 1970-01-01
      • 2011-10-16
      • 1970-01-01
      • 2021-02-13
      • 1970-01-01
      相关资源
      最近更新 更多