【问题标题】:How to use multi-thread consumer in kafka 0.9.0?如何在 kafka 0.9.0 中使用多线程消费者?
【发布时间】:2016-07-16 09:38:50
【问题描述】:

kafka 的文档给出了一个方法,描述如下:

每个线程一个消费者:一个简单的选择是为每个线程提供自己的消费者>实例。

我的代码:

public class KafkaConsumerRunner implements Runnable {

    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final CloudKafkaConsumer consumer;
    private final String topicName;

    public KafkaConsumerRunner(CloudKafkaConsumer consumer, String topicName) {
        this.consumer = consumer;
        this.topicName = topicName;
    }

    @Override
    public void run() {
        try {
            this.consumer.subscribe(topicName);
            ConsumerRecords<String, String> records;
            while (!closed.get()) {
                synchronized (consumer) {
                    records = consumer.poll(100);
                }
                for (ConsumerRecord<String, String> tmp : records) {
                    System.out.println(tmp.value());
                }
            }
        } catch (WakeupException e) {
            // Ignore exception if closing
            System.out.println(e);
            //if (!closed.get()) throw e;
        }
    }

    // Shutdown hook which can be called from a separate thread
    public void shutdown() {
        closed.set(true);
        consumer.wakeup();
    }

    public static void main(String[] args) {
        CloudKafkaConsumer kafkaConsumer = KafkaConsumerBuilder.builder()
                .withBootstrapServers("172.31.1.159:9092")
                .withGroupId("test")
                .build();
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        executorService.execute(new KafkaConsumerRunner(kafkaConsumer, "log"));
        executorService.execute(new KafkaConsumerRunner(kafkaConsumer, "log.info"));
        executorService.shutdown();
    }
}

但它不起作用并抛出异常:

java.util.ConcurrentModificationException: KafkaConsumer 对多线程访问不安全

此外,我阅读了 Flink(分布式流和批处理数据处理的开源平台)的源代码。使用多线程消费者的 Flink 和我的类似。

long pollTimeout = Long.parseLong(flinkKafkaConsumer.properties.getProperty(KEY_POLL_TIMEOUT, Long.toString(DEFAULT_POLL_TIMEOUT)));
pollLoop: while (running) {
    ConsumerRecords<byte[], byte[]> records;
    //noinspection SynchronizeOnNonFinalField
    synchronized (flinkKafkaConsumer.consumer) {
        try {
            records = flinkKafkaConsumer.consumer.poll(pollTimeout);
        } catch (WakeupException we) {
            if (running) {
                throw we;
            }
            // leave loop
            continue;
        }
    }

flink code of mutli-thread

怎么了?

【问题讨论】:

    标签: java multithreading apache-kafka distributed-computing apache-flink


    【解决方案1】:

    Kafka 消费者不是线程安全的。正如您在问题中指出的那样,该文件指出

    一个简单的选择是给每个线程一个自己的消费者实例

    但在您的代码中,您拥有由不同 KafkaConsumerRunner 实例包装的相同消费者实例。因此,多个线程正在访问同一个消费者实例。 kafka文档明确说明

    Kafka 消费者不是线程安全的。所有网络 I/O 都发生在 进行调用的应用程序的线程。这是责任 用户确保多线程访问正确 同步。非同步访问将导致 ConcurrentModificationException。

    这正是您收到的例外情况。

    【讨论】:

      【解决方案2】:

      它会在您调用订阅时引发异常。 this.consumer.subscribe(topicName);

      像这样将该块移动到同步块中:

      @Override
      public void run() {
          try {
              synchronized (consumer) {
                  this.consumer.subscribe(topicName);
              }
              ConsumerRecords<String, String> records;
              while (!closed.get()) {
                  synchronized (consumer) {
                      records = consumer.poll(100);
                  }
                  for (ConsumerRecord<String, String> tmp : records) {
                      System.out.println(tmp.value());
                  }
              }
          } catch (WakeupException e) {
              // Ignore exception if closing
              System.out.println(e);
              //if (!closed.get()) throw e;
          }
      }
      

      【讨论】:

      • 为我工作。
      【解决方案3】:

      也许不是你的情况,但如果你正在合并处理多个主题的数据,那么你可以从同一个消费者的多个主题中读取数据。如果没有,那么最好创建使用每个主题的单独作业。

      【讨论】:

        猜你喜欢
        • 2018-10-07
        • 2015-09-03
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2017-08-13
        • 1970-01-01
        • 2021-01-23
        相关资源
        最近更新 更多