【发布时间】:2018-10-27 22:43:39
【问题描述】:
我想统计来自 kakfa 主题的消息。
例如我有这个案例类:
case class Message(timestamp: LocalDateTime)
我收到了这堂课的消息,我想数一数我在 1 小时内收到了多少条消息。假设消息在该主题中排序(时间戳对应消息进入主题的时间)。
我想创建一个这样的案例类:
case class Counter(datetime: LocalDateTime, count: Int)
假设我第一个小时有 100 条消息,那么我将有 150 条消息:
Counter("2018-05-17 00:00:00", 100)
Counter("2018-05-17 00:01:00", 150)
你知道怎么做吗?有关我不能/不想使用 kafka-streams 的信息。
谢谢!
编辑:
我的来源是我想与消费者 API 一起使用的 kafka 主题。我的接收器是一个 postgresql 表。
【问题讨论】:
-
你会使用 Akka Streams Kafka 吗?
-
我真的不知道这个框架,但如果这“只是”一个库,如果你认为它比只使用 kafka 更好,我愿意听听
-
实际上流的使用取决于你的源和接收器,所以如果你的源和接收器都是kafka,那么使用kafka流,如果是别的东西,那么Akka流会更好。
-
那么你想知道消费者端的消息数量吗?
-
你打算如何使用消费者 API 来消费来自主题的消息?
标签: scala apache-kafka