【发布时间】:2020-05-26 21:35:34
【问题描述】:
我有多个生产者将数据输入 Kafka。我希望每小时运行一个消费者,以一次获取所有累积的数据并进一步处理。
我想到的选项是:
- 使用 python 线程并使用等效的 setInterval 来调用消费者
- 设置 max_poll_interval_ms 变量:(如其他几个答案中所述)。但是,官方文件指出
这为消费者可以使用的时间设置了一个上限 在获取更多记录之前处于空闲状态。如果之前没有调用 poll() 此超时到期,则认为消费者失败并且 小组将重新平衡 这听起来不像是它负责让消费者进入睡眠状态然后再次触发它。
- 我不是每小时轮询一次,而是跟踪消费者偏移量并在 10,000 条记录附加到 Kafka 后进行轮询
但是,我想在 Consumer 本身内进行相同的管理。最好的方法是什么 ?
【问题讨论】:
-
您可能想检查设置一个您愿意处理的预定义数据量(通过
fetch.min.bytes)是否比等待预定义时间更好。
标签: apache-kafka kafka-consumer-api polling kafka-python