【问题标题】:Error while fetching metadata with correlation id 92 : {myTest=UNKNOWN_TOPIC_OR_PARTITION}获取相关 ID 为 92 的元数据时出错:{myTest=UNKNOWN_TOPIC_OR_PARTITION}
【发布时间】:2021-06-28 19:12:55
【问题描述】:

我创建了一个示例应用程序来检查我的生产者的代码。当我在没有分区键的情况下发送数据时,我的应用程序运行良好。但是,在为数据分区指定密钥时,我得到了错误:

[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 37 : {myTest=UNKNOWN_TOPIC_OR_PARTITION}
[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 38 : {myTest=UNKNOWN_TOPIC_OR_PARTITION}
[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 39 : {myTest=UNKNOWN_TOPIC_OR_PARTITION}

适用于消费者和生产者。我在互联网上搜索了很多,他们建议验证 kafka.acl 设置。我在 HDInsight 上使用 kafka,但我不知道如何验证它并解决此问题。

我的集群有以下配置:

  1. 头节点:2
  2. 工作节点:4
  3. 动物园管理员:3

我的生产者代码:

public static void produce(String brokers, String topicName) throws IOException{

    // Set properties used to configure the producer
    Properties properties = new Properties();
      // Set the brokers (bootstrap servers)
    properties.setProperty("bootstrap.servers", brokers);
    properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

    // specify the protocol for Domain Joined clusters

    //To create an Idempotent Producer
    properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
    properties.setProperty(ProducerConfig.ACKS_CONFIG, "all");
    properties.setProperty(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));
    properties.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test-transactional-id"); 
    KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
    producer.initTransactions();
    // So we can generate random sentences
    Random random = new Random();
    String[] sentences = new String[] {
            "the cow jumped over the moon",
            "an apple a day keeps the doctor away",
            "four score and seven years ago",
            "snow white and the seven dwarfs",
            "i am at two with nature",
         };


    for(String sentence: sentences){
        // Send the sentence to the test topic
        try
        {
            String key=sentence.substring(0,2);
            producer.beginTransaction();
            producer.send(new ProducerRecord<String, String>(topicName,key,sentence)).get();
        }
        catch (Exception ex)
        {
          System.out.print(ex.getMessage());
            throw new IOException(ex.toString());
        }
        producer.commitTransaction();
    }
}

另外,我的主题由 3 个分区组成,复制因子=3

【问题讨论】:

    标签: java apache-kafka kafka-producer-api


    【解决方案1】:

    我使复制因子小于分区数,它对我有用。这对我来说听起来很奇怪,但是是的,它在它之后开始工作。

    【讨论】:

      【解决方案2】:

      该错误清楚地表明您正在生成的主题(或分区)不存在。

      最终,您需要描述主题(通过 CLI kafka-topics --describe --topic &lt;topicName&gt; 或其他方式)以验证这是否属实

      Kafka on HDInsight,我不知道如何验证它并解决这个问题。

      ACL 仅在您安装集群时才会设置,但我相信您仍然可以通过 zookeper-shell 或通过 SSH 连接到 Hadoop 主服务器之一列出 ACL。

      【讨论】:

      • 我已经确保我的主题存在,而且当我在没有分区键的情况下发送时代码运行得非常好。为什么添加分区键会报错?
      • 不确定“分区键”是什么意思。这将是“记录密钥”,即使您实际上没有设置一个,它仍然以null 发送。你用的是什么分区器?
      • 为了完整起见,在没有创建主题时也会发生同样的情况。
      【解决方案3】:

      我在创建新主题时也遇到了同样的问题。当我描述主题时,我可以看到领导者没有分配到主题分区。

      主题:xxxxxxxxx 分区:0 领导者:无 副本:3,2,1 Isr: 主题:xxxxxxxxx分区:1领导者:无副本:1,3,2 Isr:

      经过一番谷歌搜索,发现当我们与控制器代理出现问题时可能会发生这种情况,因此重新启动了控制器代理。

      一切都按预期工作......!

      【讨论】:

        猜你喜欢
        • 2020-05-15
        • 2021-10-09
        • 2021-11-21
        • 2020-12-27
        • 2020-01-05
        • 2016-07-26
        • 2016-10-20
        • 2019-10-06
        • 1970-01-01
        相关资源
        最近更新 更多