【问题标题】:InstanceAlreadyExistsException coming from kafka consumer来自 kafka 消费者的 InstanceAlreadyExistsException
【发布时间】:2017-04-14 07:57:31
【问题描述】:

我正在与 Kafka 合作,并尝试按照article 设置消费者组。唯一的区别是我创建了自己的抽象类和处理程序以简化设计。

下面是我的抽象类:

public abstract class Consumer implements Runnable {
  private final Properties consumerProps;
  private final String consumerName;

  public Consumer(String consumerName, Properties consumerProps) {
    this.consumerName = consumerName;
    this.consumerProps = consumerProps;
  }

  protected abstract void shutdown();

  protected abstract void run(String consumerName, Properties consumerProps);

  @Override
  public final void run() {
    run(consumerName, consumerProps);
  }
}

下面是我的KafkaConsumerA,它扩展了抽象类:

public class KafkaConsumerA extends Consumer {
  private KafkaConsumer<byte[], DataHolder> consumer;

  public KafkaConsumerA(String consumerName, Properties consumerProps) {
    super(consumerName, consumerProps);
  }

  @Override
  public void shutdown() {
    consumer.wakeup();
  }

  @Override
  protected void run(String consumerName, Properties consumerProps) {
    // exception comes from below line from two of the threads and the remaining one thread works fine.
    consumer = new KafkaConsumer<>(consumerProps);
    List<String> topics = getTopicsBasisOnConsumerName(consumerName);
    try {
      consumer.subscribe(topics);
      // Setup the schema config
      Map<String, Object> config = new HashMap<>();
      config.put("urls", "https://abc.qa.host.com");

      GenericRecordDomainDataDecoder decoder = new GenericRecordDomainDataDecoder(config);
      while (true) {
        ConsumerRecords<byte[], DataHolder> records = consumer.poll(200);
        for (ConsumerRecord<byte[], DataHolder> record : records) {
          Map<String, Object> data = new HashMap<>();
          data.put("partition", record.partition());
          data.put("offset", record.offset());
          data.put("value", record.value());
          System.out
              .println((Thread.currentThread().getId() % 3) + 1 + ": " + decoder.decode(record.value()));
        }
      }
    } catch (WakeupException ex) {
      ex.printStackTrace();
    } catch (Exception ex) {
      ex.printStackTrace();
    } finally {
      consumer.close();
    }
  }
}

下面是我的 Handler 类:

// looks like something is wrong in this class
public final class ConsumerHandler {
  private final ExecutorService executorServiceProcess;
  private final Consumer consumer;
  private final List<Consumer> consumers = new ArrayList<>();

  public ConsumerHandler(Consumer consumer, int poolSize) {
    this.executorServiceProcess = Executors.newFixedThreadPool(poolSize);
    this.consumer = consumer;
    for (int i = 0; i < poolSize; i++) {
      consumers.add(consumer);
      executorServiceProcess.submit(consumer);
    }
  }

  public void shutdown() {
    Runtime.getRuntime().addShutdownHook(new Thread() {
      @Override
      public void run() {
        for (Consumer consumer : consumers) {
          consumer.shutdown();
        }
        executorServiceProcess.shutdown();
        try {
          executorServiceProcess.awaitTermination(1000, TimeUnit.MILLISECONDS);
        } catch (InterruptedException ex) {
          Thread.currentThread().interrupt();
        }
      }
    });
  }
}

这是我从主类开始消费组中的所有消费者:

  public static void main(String[] args) {
    ConsumerHandler handlerA =
        new ConsumerHandler(new KafkaConsumerA("KafkaConsumerA", getConsumerProps()), 3);
    // run KafkaConsumerB here

     handlerA.shutdown();
     // shutdown KafkaConsumerB here
  }

因此,我的计划是在KafkaConsumerA 中建立一个包含三个消费者的消费者组,并且所有三个消费者都订阅了相同的主题。

错误:-

每当我运行它时,看起来消费者组中只有一个消费者有效,而另外两个无效。我在这两个控制台上看到了这个异常:

javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=a97716e0-0e05-4938-8fa1-6b872cf24e34
    at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) ~[na:1.7.0_79]
    at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898) ~[na:1.7.0_79]
    at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) ~[na:1.7.0_79]
    at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900) ~[na:1.7.0_79]
    at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324) ~[na:1.7.0_79]
    at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) ~[na:1.7.0_79]
    at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:58) ~[kafka-clients-0.10.0.0-SASL.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:694) [kafka-clients-0.10.0.0-SASL.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:587) [kafka-clients-0.10.0.0-SASL.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:569) [kafka-clients-0.10.0.0-SASL.jar:na]

我在这里做错了什么? getConsumerProps() 方法返回属性对象,其中包含 client.idgroup.id,对于该消费者组中的所有三个消费者具有相同的值。

以下是我的设计细节:

  • 我的KafkaConsumerA 将在一个消费者组中拥有三个消费者,每个消费者将在topicA 上工作。
  • 我的 KafkaConsumerB(类似于 KafkaConsumerA)将有两个消费者在不同的消费者组中,每个消费者都将在 topicB 上工作。

这两个消费者KafkaConsumerAKafkaConsumerB 将运行在同一个盒子上,不同的消费者组相互独立。

【问题讨论】:

    标签: java multithreading apache-kafka kafka-consumer-api


    【解决方案1】:

    Kafka 正在尝试注册 MBeans 以进行应用程序监控,并且正在使用 client.id 进行此操作。正如您所说,您在抽象类中注入了属性,并为A 组中的每个消费者注入相同的client.idgroup.id。但是,您有不同的客户,因此您应该给他们自己的client.id,但保持相同的group.id。这将在同一消费者组中注册不同的客户端/消费者并使它们一起工作,但不会在 MBean 注册上发生冲突。

    【讨论】:

    • 我明白了。谢谢(你的)信息。我会试试看。你还认为我的设计和逻辑是否适合我正在尝试做的事情。我在问题的底部提到了它。或者您是否看到任何可以改进的设计?
    • 这是一个非常合理的设置,但我当然不知道你的域。
    • 我的想法是在同一个 JVM 上运行多个 kafka 消费者。而且每个 kafka 消费者都会有多个线程。例如:KafkaConsumerA 将有三个消费者在一个消费者组中工作 topicAKafkaConsumerB 将有 2 个消费者在另一个消费者组中工作 topicB。这两个KafkaConsumerAKafkaConsumerB 都将在单个JVM 上运行。
    • 对我来说听起来完全可行。
    • ok.. 这个错误是否也会影响我在该消费者组中的消费者,就像它不会从 kafka 接收任何数据一样?还是只是一条警告消息,告诉 MBeans 注册失败?
    【解决方案2】:

    我知道这是一个老问题,但考虑到这些天我们经常使用注释。所以添加了另一种风格的问题和答案。 我们面临同样的问题,但我们一直在同一个应用程序中的 2 个消费者中使用 @KafkaListener 注释,并且大多数属性直接注入

    @KafkaListener(
    topics = "${app.source}",
    groupId = "${app.kafka.consumer.group-id}",
    clientIdPrefix = "subscriber",
    containerFactory = ListenerContainerFactory.AVRO_SPECIFIC,
    errorHandler = "customConsumerAwareListenerErrorHandler"
    )
    

    我们的消费者有类似的实现,但连接到不同的主题,所以我们只是修改了“clientIdPrefix”,以便在实例化期间为他们提供唯一值。所以最终的代码是

    @KafkaListener(
    topics = "${app.source}",
    groupId = "${app.kafka.consumer.group-id}",
    clientIdPrefix = "firstSubscriber",
    containerFactory = ListenerContainerFactory.AVRO_SPECIFIC,
    errorHandler = "customConsumerAwareListenerErrorHandler"
    )
    

    @KafkaListener(
    topics = "${app.source}",
    groupId = "${app.kafka.consumer.group-id}",
    clientIdPrefix = "secondSubscriber",
    containerFactory = ListenerContainerFactory.AVRO_SPECIFIC,
    errorHandler = "customConsumerAwareListenerErrorHandler"
    )
    

    【讨论】:

    • 为了更清楚,clientIdPrefix 完成了这项工作。
    猜你喜欢
    • 2019-08-10
    • 1970-01-01
    • 2019-07-01
    • 2023-02-02
    • 1970-01-01
    • 2021-07-09
    • 1970-01-01
    • 2022-08-16
    • 1970-01-01
    相关资源
    最近更新 更多