【发布时间】:2019-08-18 06:52:29
【问题描述】:
编辑
万一其他人处于这种特殊情况,我在调整消费者配置后得到了类似于我正在寻找的东西。我创建了一个生产者,将优先级消息发送到三个单独的主题(用于高/中/低优先级),然后我创建了 3 个单独的消费者来消费每个主题。然后我经常轮询优先级高的话题,不轮询低优先级的话题,除非high是空的:
while(true) {
final KafkaConsumer<String,String> highPriConsumer = createConsumer(TOPIC1);
final KafkaConsumer<String,String> medPriConsumer = createConsumer(TOPIC2);
final ConsumerRecords<String, String> consumerRecordsHigh = highPriConsumer.poll(100);
if (!consumerRecordsHigh.isEmpty()) {
//process high pri records
} else {
final ConsumerRecords<String, String> consumerRecordsMed = medPriConsumer.poll(100);
if (!consumerRecordsMed.isEmpty()) {
//process med pri records
轮询超时(.poll() 方法的参数)决定了如果没有要轮询的记录要等待多长时间。我为每个主题将其设置为非常短的时间,但您可以将其设置为较低的优先级,以确保当存在高优先级消息时它不会消耗宝贵的周期等待
max.poll.records 配置显然决定了在一次轮询中抓取的最大记录数。对于更高的优先级,这也可以设置得更高。
max.poll.interval.ms 配置确定轮询之间的时间 - 处理 max.poll.records 消息需要多长时间。澄清here.
另外,我相信暂停/恢复整个消费者/主题可以这样实现:
kafkaConsumer.pause(kafkaConsumer.assignment())
if(kafkaConsumer.paused().containsAll(kafkaConsumer.assignment())) {
kafkaConsumer.resume(kafkaConsumer.assignment());
}
我不确定这是否是最好的方法,但我在其他地方找不到很好的例子
我同意下面的 senseiwu 的观点,这并不是 Kafka 的真正正确用途。这是单线程处理,每个主题都有一个专门的消费者,但我会从这里开始改进这个过程。
背景
我们正在尝试改进我们的应用程序,并希望使用 Apache Kafka 在解耦组件之间进行消息传递。我们的系统经常是低带宽的(尽管在某些情况下带宽可能会很高),并且有小的、高优先级的消息必须在较大的文件等待时处理,或者处理缓慢以消耗更少的带宽。我们希望有不同优先级的主题。
我是 Kafka 的新手,但我尝试研究处理器 API 和 Kafka Streams 均未成功,尽管论坛上的某些帖子似乎说这是可行的。
处理器 API
当我尝试Processor API 时,我试图通过检查poll() 是否为空来确定高优先级KafkaConsumer 当前是否正在处理任何事情,然后希望poll() 与Med Priority Consumer 一起处理,但是第二个主题投票返回空。为了调用kafkaConsumer.pause(partitions),似乎也没有一种简单的方法可以让所有TopicPartition 关注某个主题。
Kafka 流
当我尝试KafkaStreams 时,我设置了一个流以从我的每个“优先级”主题消费,但无法检查KStream 或KafkaStreams 实例是否连接到更高优先级的主题当前处于空闲或正在处理中。
我的代码基于this 文件
其他
我也尝试了这里的代码:priority-kafka-client,但它没有按预期工作,因为运行下载的测试文件有不同的优先级。
我找到了this 线程,其中一位开发人员说(解决为主题添加优先级的问题):“......用户可以通过暂停和恢复来实现此行为”。但我无法弄清楚他的意思是如何做到这一点。
我找到了this StackOverflow 的文章,但他们似乎使用的是非常旧的版本,我不清楚他们的映射功能应该如何工作。
结论
如果有人能告诉我他们是否认为这是值得追求的事情,我将非常感激。如果这不是 Apache Kafka 应该的工作方式,因为它破坏了从自动主题/分区处理中获得的好处,那很好,我会在别处寻找。然而,有很多情况下人们似乎在这方面取得了成功,我想尝试一下。谢谢。
【问题讨论】:
-
您应该发布自己问题的答案并接受它,而不是更新您的问题 :) -- 顺便说一句:Kafka Streams 不适合,因为消息根据它们的时间戳进行优先级排序,什么对数据流处理有意义。
标签: apache-kafka priority-queue apache-kafka-streams