【问题标题】:Stop a Kafka Streams app停止 Kafka Streams 应用程序
【发布时间】:2016-12-27 04:45:15
【问题描述】:

是否有可能有一个 Kafka Streams 应用程序运行一个主题中的所有数据然后退出?

示例我根据日期将数据生成到主题中。消费者被 cron 启动,遍历所有可用数据,然后......做什么?我不希望它坐下来等待更多数据。假设一切都在那里,然后优雅地退出。

可能吗?

【问题讨论】:

  • I don't want it to sit and wait for more data。这不是流的概念吗?您说的是批处理调度,而不是流式传输。
  • 我有点困惑你为什么接受下面的答案。如果我们谈论Kafka Streams库,消费者是内部管理的,您无法访问它......那么下面显示的方法应该如何工作?

标签: java apache-kafka apache-kafka-streams


【解决方案1】:

您可以创建一个consumer,然后一旦它停止提取数据,您就可以调用consumer.close()。或者,如果您以后想再次投票,只需致电consumer.pause() 并稍后致电.resume

执行此操作的一种方法是在消费者轮询块中。比如

data = consumer.poll()
if (!data.next()) {
   consumer.close()
}

请记住,poll 返回 ConsumerRecord<K,V> 并符合 Iterable 接口。

【讨论】:

  • 此方法不适用于 Matthias J. Sax 对上述问题的评论中提到的 Kafka Streams 应用程序。
【解决方案2】:

在 Kafka Streams(与其他流处理解决方案一样)中,它不是“数据结束”,因为它首先是流处理,而不是批处理。

不过,您可以观察 Kafka Streams 应用程序的“滞后”,如果没有滞后则将其关闭(滞后,是尚未消费的消息数)。

例如,您可以使用bin/kafka-consumer-groups.sh 来检查您的 Streams 应用程序的延迟(应用程序 ID 用作消费者组 ID)。如果您想将其嵌入到您的 Streams 应用程序中,您可以使用 kafka.admin.AdminClient 获取消费者组信息。

【讨论】:

    猜你喜欢
    • 2018-07-17
    • 2022-01-23
    • 1970-01-01
    • 2023-03-09
    • 1970-01-01
    • 1970-01-01
    • 2019-04-09
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多