【问题标题】:Count messages from kafka topic all hour统计来自 kafka 主题的所有消息
【发布时间】:2018-10-27 22:43:39
【问题描述】:

我想统计来自 kakfa 主题的消息。

例如我有这个案例类:

case class Message(timestamp: LocalDateTime)

我收到了这堂课的消息,我想数一数我在 1 小时内收到了多少条消息。假设消息在该主题中排序(时间戳对应消息进入主题的时间)。

我想创建一个这样的案例类:

case class Counter(datetime: LocalDateTime, count: Int)

假设我第一个小时有 100 条消息,那么我将有 150 条消息:

Counter("2018-05-17 00:00:00", 100)
Counter("2018-05-17 00:01:00", 150)

你知道怎么做吗?有关我不能/不想使用 kafka-streams 的信息。

谢谢!

编辑:

我的来源是我想与消费者 API 一起使用的 kafka 主题。我的接收器是一个 postgresql 表。

【问题讨论】:

  • 你会使用 Akka Streams Kafka 吗?
  • 我真的不知道这个框架,但如果这“只是”一个库,如果你认为它比只使用 kafka 更好,我愿意听听
  • 实际上流的使用取决于你的源和接收器,所以如果你的源和接收器都是kafka,那么使用kafka流,如果是别的东西,那么Akka流会更好。
  • 那么你想知道消费者端的消息数量吗?
  • 你打算如何使用消费者 API 来消费来自主题的消息?

标签: scala apache-kafka


【解决方案1】:

我用Flink 想出了一个解决方案。

我在 Flink 中阅读了一些关于时间窗口的文档,这个 page 谈到了一个主题中的升序时间戳(这是我的情况)。

所以这里有一个解决方案:

  val inputStream: DataStream[Message] = env.addSource(kafkaConsumer)
  val timedStream: DataStream[Message] = inputStream
    .assignAscendingTimestamps(_.timestamp)
  val timeWindow = timedStream.timeWindowAll(Time.minutes(1)).sum(1)

它会计算 1 分钟的滚动窗口内的所有元素。

对于更具体的解决方案并到达Counter("2018-05-17 00:00:00", 100),我们必须扩展AllWindowFunction

  class CustomWindowFunction extends AllWindowFunction[Message, Counter, TimeWindow] {
  def apply(window: TimeWindow, input: Iterable[Message], out: Collector[Counter]): Unit = {
    out.collect(
      Counter(
        new LocalDateTime(window.getStart),
        input.size
      )
    )
  }
}

然后将其应用到我们的 timeStream:

  val inputStream: DataStream[MyClass] = env.addSource(kafkaConsumer)
  val timedStream: DataStream[MyClass] = inputStream
    .assignAscendingTimestamps(_.timestamp)
  val timeWindow = timedStream.timeWindowAll(Time.minutes(1)).apply(new CustomWindowFunction())

如果在我们的主题输入中我们有 Message 类,我们会在末尾获得 Counter 类。

这是我目前找到的“更好”的解决方案。

【讨论】:

    【解决方案2】:

    您想要的解决方案在流处理术语中通常称为 windowing,并且大多数流处理库都具有此功能。 Software Mill 的a good writeup 比较了 Spark Streaming、Flink、Kafka Streams 和 Akka Streams。

    您可以尝试自己实现它,但上面提到的库都经过了实战测试,并且具有简单、可读的 API。如果您不想使用 Kafka Streams,那么其中一个 cmets(Alpakka project 的一部分)中提到的Akka Streams Kafka 值得考虑。

    【讨论】:

    • 感谢您的回答。你知道我可以从哪里找到一些例子吗?对于 Flink,它将是顶级的,我已经在我的管道中使用了一个作业 Flink。
    猜你喜欢
    • 2021-10-26
    • 2021-12-10
    • 2019-11-15
    • 1970-01-01
    • 1970-01-01
    • 2018-09-15
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多