【问题标题】:What is the best way of polling at regular intervals a Kafka Consumer while using kafka-python?在使用 kafka-python 时定期轮询 Kafka Consumer 的最佳方式是什么?
【发布时间】:2020-05-26 21:35:34
【问题描述】:

我有多个生产者将数据输入 Kafka。我希望每小时运行一个消费者,以一次获取所有累积的数据并进一步处理。


我想到的选项是:

  • 使用 python 线程并使用等效的 setInterval 来调用消费者
  • 设置 ma​​x_poll_interval_ms 变量:(如其他几个答案中所述)。但是,官方文件指出

这为消费者可以使用的时间设置了一个上限 在获取更多记录之前处于空闲状态。如果之前没有调用 poll() 此超时到期,则认为消费者失败并且 小组将重新平衡 这听起来不像是它负责让消费者进入睡眠状态然后再次触发它。

  • 我不是每小时轮询一次,而是跟踪消费者偏移量并在 10,000 条记录附加到 Kafka 后进行轮询

但是,我想在 Consumer 本身内进行相同的管理。最好的方法是什么 ?

【问题讨论】:

  • 您可能想检查设置一个您愿意处理的预定义数据量(通过fetch.min.bytes)是否比等待预定义时间更好。

标签: apache-kafka kafka-consumer-api polling kafka-python


【解决方案1】:

如果你阅读官方文档max_poll_interval_ms,它是消费者可以空闲的最大时间间隔。之后,消费者被视为死亡,消费者群体重新平衡。

这就是为什么我建议您不要在 10k 之后关闭消费者。尽管使用偏移量进行轮询是一个很好的策略,但这样做存在问题。每个新的消费者偏移量并不意味着它是一条新消息。根据您的 auto.offset.reset 配置,您可能会收到重复的消息。

为了节省运行 pod 的成本,我建议你应该创建一个分区更少的主题。这可能会为您节省数据传输 + 存储成本。虽然实例应该保持运行。

【讨论】:

    【解决方案2】:

    使用 Cron 或您的操作系统调度程序每小时调用一次脚本。

    如果您需要等到 10k 条记录出现在主题上才能做任何有用的事情,那么我不完全确定 Kafka 是否适合该架构。另外,消费者滞后实际上会不断落后

    【讨论】:

    • 此要求适用于用于处理事件、压缩事件并推送到云存储桶的特定消费者。我不希望这个 Consumer Pod 持续运行以最小化成本,这就是为什么我希望将 Consumer 设置为间歇运行。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-10-25
    • 1970-01-01
    • 2016-11-03
    • 2023-01-19
    • 2020-03-20
    • 1970-01-01
    相关资源
    最近更新 更多