【问题标题】:Is there a way to prioritize messages in Apache Kafka 2.0?有没有办法在 Apache Kafka 2.0 中确定消息的优先级?
【发布时间】:2019-08-18 06:52:29
【问题描述】:

编辑

万一其他人处于这种特殊情况,我在调整消费者配置后得到了类似于我正在寻找的东西。我创建了一个生产者,将优先级消息发送到三个单独的主题(用于高/中/低优先级),然后我创建了 3 个单独的消费者来消费每个主题。然后我经常轮询优先级高的话题,不轮询低优先级的话题,除非high是空的:

    while(true) {
        final KafkaConsumer<String,String> highPriConsumer = createConsumer(TOPIC1);
        final KafkaConsumer<String,String> medPriConsumer = createConsumer(TOPIC2);

        final ConsumerRecords<String, String> consumerRecordsHigh = highPriConsumer.poll(100);
        if (!consumerRecordsHigh.isEmpty()) {
            //process high pri records
        } else {
            final ConsumerRecords<String, String> consumerRecordsMed = medPriConsumer.poll(100);
            if (!consumerRecordsMed.isEmpty()) {
                //process med pri records

轮询超时(.poll() 方法的参数)决定了如果没有要轮询的记录要等待多长时间。我为每个主题将其设置为非常短的时间,但您可以将其设置为较低的优先级,以确保当存在高优先级消息时它不会消耗宝贵的周期等待

max.poll.records 配置显然决定了在一次轮询中抓取的最大记录数。对于更高的优先级,这也可以设置得更高。

max.poll.interval.ms 配置确定轮询之间的时间 - 处理 max.poll.records 消息需要多长时间。澄清here.

另外,我相信暂停/恢复整个消费者/主题可以这样实现:

    kafkaConsumer.pause(kafkaConsumer.assignment())
    if(kafkaConsumer.paused().containsAll(kafkaConsumer.assignment())) {
        kafkaConsumer.resume(kafkaConsumer.assignment());
    }

我不确定这是否是最好的方法,但我在其他地方找不到很好的例子

我同意下面的 senseiwu 的观点,这并不是 Kafka 的真正正确用途。这是单线程处理,每个主题都有一个专门的消费者,但我会从这里开始改进这个过程。


背景

我们正在尝试改进我们的应用程序,并希望使用 Apache Kafka 在解耦组件之间进行消息传递。我们的系统经常是低带宽的(尽管在某些情况下带宽可能会很高),并且有小的、高优先级的消息必须在较大的文件等待时处理,或者处理缓慢以消耗更少的带宽。我们希望有不同优先级的主题。

我是 Kafka 的新手,但我尝试研究处理器 API 和 Kafka Streams 均未成功,尽管论坛上的某些帖子似乎说这是可行的。

处理器 API

当我尝试Processor API 时,我试图通过检查poll() 是否为空来确定高优先级KafkaConsumer 当前是否正在处理任何事情,然后希望poll() 与Med Priority Consumer 一起处理,但是第二个主题投票返回空。为了调用kafkaConsumer.pause(partitions),似乎也没有一种简单的方法可以让所有TopicPartition 关注某个主题。

Kafka 流

当我尝试KafkaStreams 时,我设置了一个流以从我的每个“优先级”主题消费,但无法检查KStreamKafkaStreams 实例是否连接到更高优先级的主题当前处于空闲或正在处理中。

我的代码基于this 文件

其他

我也尝试了这里的代码:priority-kafka-client,但它没有按预期工作,因为运行下载的测试文件有不同的优先级。

我找到了this 线程,其中一位开发人员说(解决为主题添加优先级的问题):“......用户可以通过暂停和恢复来实现此行为”。但我无法弄清楚他的意思是如何做到这一点。

我找到了this StackOverflow 的文章,但他们似乎使用的是非常旧的版本,我不清楚他们的映射功能应该如何工作。

结论

如果有人能告诉我他们是否认为这是值得追求的事情,我将非常感激。如果这不是 Apache Kafka 应该的工作方式,因为它破坏了从自动主题/分区处理中获得的好处,那很好,我会在别处寻找。然而,有很多情况下人们似乎在这方面取得了成功,我想尝试一下。谢谢。

【问题讨论】:

  • 您应该发布自己问题的答案并接受它,而不是更新您的问题 :) -- 顺便说一句:Kafka Streams 不适合,因为消息根据它们的时间戳进行优先级排序,什么对数据流处理有意义。

标签: apache-kafka priority-queue apache-kafka-streams


【解决方案1】:

这听起来像是您的应用程序中的一个设计问题 - kafka 最初被设计为一个提交日志,其中每条消息都以偏移量写入代理,并且各种消费者以它们提交的顺序以非常低的延迟和高吞吐量。鉴于分区而不是主题是 Kafka 中工作分配的基本单元,因此很难在本地实现主题级别的优先级。

我建议您调整您的设计以使用 Kafka 以外的其他架构组件,而不是试图剪掉您的脚以适应鞋子。您已经可以做的一件事是让您的生产者将文件上传到适当的文件存储并通过 Kafka 发送链接,包括元数据。然后根据带宽状态,您的消费者可以根据大文件的元数据来决定是否可以下载。这样,您可能更有可能拥有稳健的设计,而不是错误地使用 Kafka。

如果您确实只想坚持使用 Kafka,一种解决方案是将大文件发送到一些固定数量的硬编码分区,并且消费者仅在带宽良好时才从这些分区消费。

【讨论】:

    猜你喜欢
    • 2021-06-04
    • 2020-05-15
    • 2011-11-05
    • 1970-01-01
    • 1970-01-01
    • 2021-11-24
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多