【问题标题】:Waiting for leadership elections in KafkaJS等待KafkaJS中的领导选举
【发布时间】:2020-08-24 17:58:22
【问题描述】:

情况

我正在使用kafkajs 写入一些动态生成的 kafka 主题。

我发现在注册制作人后立即写这些主题会经常导致错误:There is no leader for this topic-partition as we are in the middle of a leadership election

完整的错误是:

{"level":"ERROR","timestamp":"2020-08-24T17:48:40.201Z","logger":"kafkajs","message":"[Connection] Response Metadata(key: 3, version: 5)","broker":"localhost:9092","clientId":"tv-kitchen","error":"There is no leader for this topic-partition as we are in the middle of a leadership election","correlationId":1,"size":146}

代码

这是导致问题的代码:

import kafka from 'myConfiguredKafkaJs'

const run = async () => {
  const producer = kafka.producer()
  await producer.connect()
  producer.send({
    topic: 'myRandomTopicString',
    messages: [{
      value: 'yolo',
    }],
  })
}

run()

问题

两个问题:

  1. 在连接到生产者(或发送)时我应该做些什么特别的事情,以确保逻辑阻塞直到生产者真正准备好将数据发送到 kafka 主题?
  2. 在向生产者发送数据时我应该做些什么特别的事情以确保消息不会被丢弃?

【问题讨论】:

    标签: node.js apache-kafka kafkajs


    【解决方案1】:

    解决方案

    Kafkajs 通过admin client 提供了一个createTopics 方法,该方法有一个可选的waitForLeaders 标志:

    admin.createTopics({
      waitForLeaders: true,
      topics: [
        { topic: 'myRandomTopicString123' },
      ],
    }
    

    使用它可以解决问题。

    import kafka from 'myConfiguredKafkaJs'
    
    const run = async () => {
      const producer = kafka.producer()
      const admin = kafka.admin()
      await admin.connect()
      await producer.connect()
      await admin.createTopics({
        waitForLeaders: true,
        topics: [
          { topic: 'myRandomTopicString123' },
        ],
      })
      producer.send({
        topic: 'myRandomTopicString',
        messages: [{
          value: 'yolo',
        }],
      })
    }
    
    run()
    

    不幸的是,如果主题已经存在,这将导致不同的错误,但这是一个单独的问题,我怀疑错误比破坏更能提供信息。

    {"level":"ERROR","timestamp":"2020-08-24T18:19:48.465Z","logger":"kafkajs","message":"[Connection] Response CreateTopics(key: 19, version: 2)","broker":"localhost:9092","clientId":"tv-kitchen","error":"Topic with this name already exists","correlationId":2,"size":86}
    

    编辑:上述设置确实需要正确配置您的 Kafka 实例。领导选举可能永远无法解决,在这种情况下,KafkaJS 仍然会抱怨领导选举!

    根据我的经验,这是由于 kafka 代理在没有从 zookeeper 注销的情况下被停止,这意味着 zookeeper 正在等待不再存在的东西的响应。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2021-11-08
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-06-30
      • 2020-04-24
      • 2012-05-08
      • 1970-01-01
      相关资源
      最近更新 更多