【问题标题】:Unable to send message to kafka Producer using kafka-node无法使用 kafka-node 向 kafka Producer 发送消息
【发布时间】:2021-11-09 15:09:46
【问题描述】:

我使用的是 Kafka 框架提供的默认 server.properties/zookeeper.properties 文件。

我正在尝试创建一个简单的 NodeJS 应用程序,它将向生产者发送消息并使用它们。

下面是 NodeJS 代码。

config.js

module.exports = {
  kafka_topic: 'catalog',
  kafka_server: 'localhost:9092',
};

nodejs-producer.js

const kafka = require('kafka-node');
const config = require('./config');

try {

    // set the desired timeout in options
    const options = {
        timeout: 5000,
    };
    const Producer = kafka.Producer;
    const client = new kafka.KafkaClient({kafkaHost: config.kafka_server, requestTimeout: 5000});
    const producer = new Producer(client);
    const kafka_topic = config.kafka_topic;
    let payloads = [
        {
            topic: kafka_topic,
            messages: 'This is test message'
        }
    ];

    producer.on('ready', async function() {
        let push_status = producer.send(payloads, (err, data) => {
            if (err) {
                console.log(err.toString());
                console.log('[kafka-producer -> '+kafka_topic+']: broker update failed');
            } else {
                console.log(data.toString());
                console.log('[kafka-producer -> '+kafka_topic+']: broker update success');
            }
        });
    });

    producer.on('error', function(err) {
        console.log(err);
        console.log('[kafka-producer -> '+kafka_topic+']: connection errored');
        throw err;
    });
}
catch(e) {
    console.log(e);
}

kafka 版本 = 2.8.0 kafka-node 版本 = 5.0.0

我收到错误 - 错误:LeaderNotAvailable

如何解决这个问题?我尝试在 server.properties 文件中使用不同的值,例如adverted.listeners,但没有得到解决方案。

【问题讨论】:

标签: apache-kafka kafka-producer-api kafka-node


【解决方案1】:

我在发送消息时也遇到了同样的问题。我通过在有效负载中添加一个分区解决了这个问题,并且消费者也使用了相同的分区。

Code I have used

【讨论】:

    【解决方案2】:

    我已经answered this problem here

    简而言之:在尝试向不存在的主题生成消息时会出现此问题。

    您可以将您的 kafka 安装配置为在这种情况下自动创建主题:接下来会发生什么 - 按顺序:您仍然会收到错误消息并且框架将创建主题。在我的情况下,我不得不再次重新生成相同的消息,但这是在旧版本的 Kafka 上。

    编辑: here a link 到一篇解释如何设置您的 kafka 配置以自动创建 kafka 主题的帖子。

    【讨论】:

    • 我已经通过 CLI 命令创建了主题。主题详情如下: -> 命令:/zookeeper-shell.sh ZooKeeper -server localhost:2181 get /brokers/topics/catalog -> 结果:{"partitions":{"0":[3,1], "1":[1,2],"2":[2,3]},"topic_id":"1nKW2XXXXXXmtkNQ_A","adding_replicas":{},"removing_replicas":{},"version":3}
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-02-14
    • 2019-01-16
    • 2013-07-22
    • 2020-08-18
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多