【问题标题】:Is there any way we can pause kafka stream for certain period and resume later?有什么方法可以暂停kafka流一段时间然后再恢复?
【发布时间】:2020-03-02 21:25:11
【问题描述】:

我们有一个要求,我们使用 Kafka Streams 从 Kafka 主题中读取数据,然后通过会话池通过网络发送数据。但是,有时,网络调用有点慢,我们需要经常暂停流,确保我们没有超载网络。目前,我们将数据捕获到流中并将其加载到执行器服务,然后通过会话池通过网络发送。

如果执行器服务中的数据过多,我们需要将流暂停一段时间,然后在执行器服务的积压清除后恢复它。为了实现这种暂停机制,我们目前正在关闭流并在积压清除后重新开始。

有什么方法可以暂停 kafka 流吗?

【问题讨论】:

  • 您可以修改流程,以便 Kafka Stream 写入另一个主题。并让 Executor 服务根据网络负载的需要读取该主题。
  • pause()/resume() 的底层消费者 API 在 Kafka Streams 中不可用。如果你真的需要它,你需要编写一个普通的 KafkaConsumer 应用程序,而不是使用 Streams API。

标签: java apache-kafka apache-kafka-streams


【解决方案1】:

如果我对您的理解正确,您无需做任何特别的事情。您说的是“背压”,Kafka Streams 可以开箱即用地处理它。

可以做的是将这些数据放入一个最大大小的队列中,并使用这个队列加载到执行器服务中。每当队列达到某个阈值时,有两种方法:

  • 如果您将数据放入队列的调用被阻塞且没有超时,您无需再做任何事情。等到系统重新上线,你的电话 退货,处理将继续。
  • 如果您将数据放入队列的调用因超时而阻塞,只需发出查找以检查队列的大小。重复此操作,直到系统重新联机并且您的呼叫成功。

唯一需要注意的是,只要您的 Streams 应用程序阻塞,内部使用的 Kafka 消费者客户端就不会向 Kafka 发送任何心跳,并且可能会超时。因此,您需要将超时配置参数设置为高于外部系统的预期最大停机时间。

另一种方法是使用 Kafka-streams 中提供的处理器 API,但通常不推荐使用这种模式。

如果有帮助请告诉我!!

【讨论】:

猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2014-12-08
  • 1970-01-01
  • 2021-11-25
  • 1970-01-01
  • 2020-10-02
  • 2021-06-12
  • 2020-11-11
相关资源
最近更新 更多