【问题标题】:How do I use multiple consumers in Kafka?如何在 Kafka 中使用多个消费者?
【发布时间】:2015-09-03 03:15:39
【问题描述】:

我是一名学习 Kafka 的新学生,在了解多个消费者方面遇到了一些基本问题,到目前为止,文章、文档等对这些消费者并没有太大帮助。

我尝试做的一件事是编写我自己的高级 Kafka 生产者和消费者并同时运行它们,向一个主题发布 100 条简单消息并让我的消费者检索它们。我已经成功地做到了这一点,但是当我尝试引入第二个消费者从刚刚发布消息的同一主题中消费时,它没有收到任何消息。

据我了解,对于每个主题,您可以拥有来自不同消费者组的消费者,并且每个消费者组都将获得针对某个主题产生的消息的完整副本。它是否正确?如果没有,我设置多个消费者的正确方法是什么?这是我目前写的消费者类:

public class AlternateConsumer extends Thread {
    private final KafkaConsumer<Integer, String> consumer;
    private final String topic;
    private final Boolean isAsync = false;

    public AlternateConsumer(String topic, String consumerGroup) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("group.id", consumerGroup);
        properties.put("partition.assignment.strategy", "roundrobin");
        properties.put("enable.auto.commit", "true");
        properties.put("auto.commit.interval.ms", "1000");
        properties.put("session.timeout.ms", "30000");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        consumer = new KafkaConsumer<Integer, String>(properties);
        consumer.subscribe(topic);
        this.topic = topic;
    }


    public void run() {
        while (true) {
            ConsumerRecords<Integer, String> records = consumer.poll(0);
            for (ConsumerRecord<Integer, String> record : records) {
                System.out.println("We received message: " + record.value() + " from topic: " + record.topic());
            }
        }

    }
}

此外,我注意到最初我正在测试主题“测试”的上述消耗,只有一个分区。当我将另一个消费者添加到现有消费者组“testGroup”时,这触发了 Kafka 重新平衡,从而大大降低了我的消费延迟,以秒为单位。我认为这是重新平衡的问题,因为我只有一个分区,但是当我创建一个具有 6 个分区的新主题“多分区”时,出现了类似的问题,即向同一消费者组添加更多消费者会导致延迟问题。我环顾四周,人们告诉我应该使用多线程消费者——有人能解释一下吗?

【问题讨论】:

  • 有一个很好的例子,高级消费者here for kafka 0.8.1
  • @chrsblck 感谢您的链接。实际上,我之前已经检查过它,并且可能没有像我所能理解的那样理解它——您能否解释一下该示例如何使用线程?我不完全理解他们现在在做什么。
  • 一种方法是让线程数与给定主题的分区数相同。从文章中获取流列表List&lt;KafkaStream&lt;byte[], byte[]&gt;&gt; streams = consumerMap.get(topic); ...然后为每个线程分配一个分区executor.submit(new ConsumerTest(stream, threadNumber))

标签: java apache-kafka


【解决方案1】:

我认为您的问题在于 auto.offset.reset 属性。当一个新的消费者从一个分区读取并且没有先前提交的偏移量时, auto.offset.reset 属性用于决定起始偏移量应该是什么。如果您将其设置为“最大”(默认值),您将从最新(最后)消息开始阅读。如果将其设置为“最小”,则会收到第一条可用消息。

所以添加:

properties.put("auto.offset.reset", "smallest");

然后再试一次。

* 编辑 *

“smallest”和“largest”不久前被弃用了。您现在应该使用“最早”或“最新”。有任何问题,请查看docs

【讨论】:

  • 这是一个迟到的回应,但感谢克里斯!您的解决方案是正确的,在仔细查看了一些文档后,我应该注意到,在启动新消费者时,它被设置为仅使用最新发送的消息 - 除非设置了上述属性,否则不会使用预先存在的消息。
  • 这对我不起作用。还应该提到的是,“最小”不是该属性的有效值。有效值为:最新、最早或无。
  • @user3335999 我建议你打开另一个问题。听起来你的情况略有不同。
【解决方案2】:

如果您希望多个消费者使用相同的消息(如广播),您可以使用不同的消费者组生成它们,并在消费者配置中将 auto.offset.reset 设置为最小。 如果您希望多个消费者并行完成消费(在它们之间分配工作),您应该创建分区数> =消费者数。一个分区最多只能被一个消费者进程消费。但是一个消费者可以消费多个分区。

【讨论】:

    【解决方案3】:

    在文档here 中它说:“如果您提供的线程多于该主题的分区数,则某些线程将永远不会看到消息”。您可以将分区添加到您的主题吗?我的消费者组线程数等于我的主题中的分区数,并且每个线程都在获取消息。

    这是我的主题配置:

    buffalo-macbook10:kafka_2.10-0.8.2.1 aakture$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic recent-wins
    Topic:recent-wins   PartitionCount:3    ReplicationFactor:1 Configs:
    Topic: recent-wins  Partition: 0    Leader: 0   Replicas: 0 Isr: 0
    Topic: recent-wins  Partition: 1    Leader: 0   Replicas: 0 Isr: 0
    Topic: recent-wins  Partition: 2    Leader: 0   Replicas: 0 Isr: 0
    

    还有我的消费者:

    package com.cie.dispatcher.services;
    
    import com.cie.dispatcher.model.WinNotification;
    import com.fasterxml.jackson.databind.ObjectMapper;
    import com.google.inject.Inject;
    import io.dropwizard.lifecycle.Managed;
    import kafka.consumer.ConsumerConfig;
    import kafka.consumer.ConsumerIterator;
    import kafka.consumer.KafkaStream;
    import kafka.javaapi.consumer.ConsumerConnector;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    
    /**
     * This will create three threads, assign them to a "group" and listen for  notifications on a topic.
     * Current setup is to have three partitions in Kafka, so we need a thread per partition (as recommended by
     * the kafka folks). This implements the dropwizard Managed interface, so it can be started and stopped by the
     * lifecycle manager in dropwizard.
     * <p/>
     * Created by aakture on 6/15/15.
     */
    public class KafkaTopicListener implements Managed {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaTopicListener.class);
    private final ConsumerConnector consumer;
    private final String topic;
    private ExecutorService executor;
    private int threadCount;
    private WinNotificationWorkflow winNotificationWorkflow;
    private ObjectMapper objectMapper;
    
    @Inject
    public KafkaTopicListener(String a_zookeeper,
                              String a_groupId, String a_topic,
                              int threadCount,
                              WinNotificationWorkflow winNotificationWorkflow,
                              ObjectMapper objectMapper) {
        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
                createConsumerConfig(a_zookeeper, a_groupId));
        this.topic = a_topic;
        this.threadCount = threadCount;
        this.winNotificationWorkflow = winNotificationWorkflow;
        this.objectMapper = objectMapper;
    }
    
    /**
     * Creates the config for a connection
     *
     * @param zookeeper the host:port for zookeeper, "localhost:2181" for example.
     * @param groupId   the group id to use for the consumer group. Can be anything, it's used by kafka to organize the consumer threads.
     * @return the config props
     */
    private static ConsumerConfig createConsumerConfig(String zookeeper, String groupId) {
        Properties props = new Properties();
        props.put("zookeeper.connect", zookeeper);
        props.put("group.id", groupId);
        props.put("zookeeper.session.timeout.ms", "400");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
    
        return new ConsumerConfig(props);
    }
    
    public void stop() {
        if (consumer != null) consumer.shutdown();
        if (executor != null) executor.shutdown();
        try {
            if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
                LOG.info("Timed out waiting for consumer threads to shut down, exiting uncleanly");
            }
        } catch (InterruptedException e) {
            LOG.info("Interrupted during shutdown, exiting uncleanly");
        }
        LOG.info("{} shutdown successfully", this.getClass().getName());
    }
    /**
     * Starts the listener
     */
    public void start() {
        Map<String, Integer> topicCountMap = new HashMap<>();
        topicCountMap.put(topic, new Integer(threadCount));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
        executor = Executors.newFixedThreadPool(threadCount);
        int threadNumber = 0;
        for (final KafkaStream stream : streams) {
            executor.submit(new ListenerThread(stream, threadNumber));
            threadNumber++;
        }
    }
    
    private class ListenerThread implements Runnable {
        private KafkaStream m_stream;
        private int m_threadNumber;
    
        public ListenerThread(KafkaStream a_stream, int a_threadNumber) {
            m_threadNumber = a_threadNumber;
            m_stream = a_stream;
        }
    
        public void run() {
            try {
                String message = null;
                LOG.info("started listener thread: {}", m_threadNumber);
                ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
                while (it.hasNext()) {
                    try {
                        message = new String(it.next().message());
                        LOG.info("receive message by " + m_threadNumber + " : " + message);
                        WinNotification winNotification = objectMapper.readValue(message, WinNotification.class);
                        winNotificationWorkflow.process(winNotification);
                    } catch (Exception ex) {
                        LOG.error("error processing queue for message: " + message, ex);
                    }
                }
                LOG.info("Shutting down listener thread: " + m_threadNumber);
            } catch (Exception ex) {
                LOG.error("error:", ex);
            }
        }
      }
    }
    

    【讨论】:

    • 您能否分享一下Kafka 1.0版本的示例,因为上面示例中使用的大多数类都已弃用。
    • 我不相信它当时已经发布了,我可能不会很快升级我的代码,抱歉。
    猜你喜欢
    • 1970-01-01
    • 2019-04-07
    • 1970-01-01
    • 2019-11-18
    • 2016-07-16
    • 1970-01-01
    • 1970-01-01
    • 2019-07-22
    相关资源
    最近更新 更多