【问题标题】:Kafka Consumer to read from multiple topicsKafka Consumer 从多个主题中读取
【发布时间】:2015-08-24 23:37:25
【问题描述】:

我对卡夫卡很陌生。我正在创建两个主题并从两个生产者那里发布这两个主题。我有一个消费者消费来自两个主题的消息。这是因为我想按照优先级进行处理。

我从两个主题中都得到了一个流,但是一旦我开始迭代任何流的ConsumerItreator,它就会阻塞在那里。正如文档中所写,它将被阻止,直到收到新消息。

有人知道如何从单个 Kafka Consumer 读取两个主题和两个流吗?

    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
                topicCountMap.put(KafkaConstants.HIGH_TEST_TOPIC, new Integer(1));
                topicCountMap.put(KafkaConstants.LOW_TEST_TOPIC, new Integer(1));
                Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);
                KafkaStream<byte[], byte[]> highPriorityStream = consumerMap.get(KafkaConstants.HIGH_TEST_TOPIC).get(0);
                ConsumerIterator<byte[], byte[]> highPrioerityIterator = highPriorityStream.iterator();

                while (highPriorityStream.nonEmpty() && highPrioerityIterator.hasNext())
                {
                    byte[] bytes = highPrioerityIterator.next().message();
                    Object obj = null;
                    CLoudDataObject thunderDataObject = null;
                    try
                    {

                        obj = SerializationUtils.deserialize(bytes);
                        if (obj instanceof CLoudDataObject)
                        {
                            thunderDataObject = (CLoudDataObject) obj;
                            System.out.println(thunderDataObject);
                            // TODO Got the Thunder object here, now write code to send it to Thunder service.
                        }

                    }
                    catch (Exception e)
                    {
                    }
                }

【问题讨论】:

    标签: apache-kafka kafka-consumer-api


    【解决方案1】:

    如果您不想在高优先级消息之前处理低优先级消息,如何设置 consumer.timeout.ms 属性并捕获 ConsumerTimeoutException 以检测高优先级流到达最后一条可用消息?默认情况下,它设置为 -1 以阻止直到新消息到达。 (http://kafka.apache.org/07/configuration.html)

    下面解释了一种同时处理具有不同优先级的多个流的方法。

    Kafka 需要多线程编程。在您的情况下,两个主题的流需要由流的线程处理。因为每个线程都会独立运行来处理消息,所以一个阻塞流(线程)不会影响其他流。

    Java 的 ThreadPool 实现可以帮助创建多线程应用程序。您可以在此处找到示例实现:

    https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example

    关于执行的优先级,您可以调用 Thread.currentThread.setPriority 方法,根据其服务的 Kafka 主题获得适当的线程优先级。

    【讨论】:

    • 感谢您的回答。但是仅仅通过设置线程优先级对我没有帮助。我的用例是首先使用高优先级主题,如果它是空的,然后从低优先级主题使用。请检查这篇文章中的答案,似乎“天空”已经实现了相同的功能:stackoverflow.com/questions/30655361/…
    • 现在我了解您的要求了。如何设置 consumer.timeout.ms 属性并捕获 ConsumerTimeoutException 以检测消费者到达最后一条可用消息?默认情况下,它设置为 -1 不会导致超时。 kafka.apache.org/07/configuration.html
    • 也解决不了问题。它无法处理以下场景:1)假设我们有两个主题“高”和“低”,而在“低”主题上,我们有非常大的消息流。因此,一旦它开始从“Low”读取消息,它就不会超时,直到“Low”流为空配置的超时时间(非常低为 100 毫秒)如果我错了请纠正我
    • 您的观点似乎正确。虽然我认为您的要求是首先使用高优先级主题,直到它为空。有问题,您能描述一下您想到的几种情况吗?
    • 假设我们有 1 个生产者发布“高”优先级主题和 100 个生产者发布“低”优先级主题。在我的用例中,我期望“低”优先级主题的流量很大。消费消息后,需要发送到一些不允许多连接的第三方云。所以我想总是在低之前消费高
    猜你喜欢
    • 2022-01-06
    • 1970-01-01
    • 2020-12-14
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多