【问题标题】:KafkaStream not receiving any messages from TopicKafkaStream 没有收到来自 Topic 的任何消息
【发布时间】:2017-04-01 02:36:35
【问题描述】:

我在玩 KafkaStreamsKafkaConnect 只是想从一个主题中消费消息。我为此主题设置了一个“标准”批量消费者,它就像一个魅力。我首先将几条记录发送到 Kafka,然后再使用它们。现在我想使用 Kakfa 流来做同样的事情,但我没有从主题中得到一条消息。这是我正在使用的消费者代码。

final int NUMBER_OF_PARTITIONS = 4;
final Properties consumerConfig = new Properties();
consumerConfig.setProperty("zookeeper.connect", RULE.getConfiguration().kafka.getZookeeperUrl());
consumerConfig.setProperty("backoff.increment.ms", "100");
consumerConfig.setProperty("group.id", "java-consumer-example");
consumerConfig.setProperty("consumer.timeout.ms", "1000000");
consumerConfig.setProperty("client.id", "someclient");
consumerConfig.setProperty("auto.offset.reset", "smallest");
consumerConfig.setProperty("enable.auto.commit", "false");
consumerConfig.setProperty("bootstrap.servers", RULE.getConfiguration().kafka.getHosts());

final ConsumerConnector connector = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerConfig));
final TopicFilter sourceTopicFilter = new Whitelist(RULE.getConfiguration().kafka.getTopic());

final VerifiableProperties decoderProps = new VerifiableProperties();
decoderProps.props().setProperty("schema.registry.url", RULE.getConfiguration().kafka.getRegistry());
decoderProps.props().setProperty("max.schemas.per.subject", "1");
final List<KafkaStream<String, Object>> streams = connector
    .createMessageStreamsByFilter(sourceTopicFilter, NUMBER_OF_PARTITIONS, new StringDecoder(decoderProps), new KafkaAvroDecoder(decoderProps));

final ExecutorService executorService = Executors.newFixedThreadPool(NUMBER_OF_PARTITIONS);
for (final KafkaStream stream : streams) {
    executorService.submit(() -> {
        try {
            final ConsumerIterator it = stream.iterator();
            while (it.hasNext()) {
                final MessageAndMetadata messageAndMetadata = it.next();
                final String key = (String) messageAndMetadata.key();
                System.out.println("KEY" + key);
            }
        } catch (final Exception ex) {
            LOGGER.error("ERROR", ex);
        }
    });
}

我的问题是,我的代码一直在 it.hasNext() 条件下等待,直到达到超时。我可能在这里遗漏了一些细节,但不知道为什么我没有从这个话题中得到任何东西。作为这个测试的一部分,我有一个生产者在消费者开始之前将一些记录发送到这个主题,所以它不可能是一个偏移问题。非常欢迎任何想法。

【问题讨论】:

    标签: kafka-consumer-api apache-kafka-connect


    【解决方案1】:

    我找到了解决方案。该错误超出了我发布的代码。我为关闭 ExecutorService 提供的超时太短,所以它只是杀死了它,而没有提供足够的时间来完成消费者工作。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2019-07-30
      • 1970-01-01
      • 2013-07-22
      • 2013-08-13
      • 2018-11-21
      • 2021-12-10
      • 2021-01-29
      相关资源
      最近更新 更多