【问题标题】:How to add timestamp to kafka 10 messages with nodejs如何使用nodejs为kafka 10消息添加时间戳
【发布时间】:2017-11-12 01:02:31
【问题描述】:

我使用 nodejs 在 node-red 中重新编写了一个 Kafka 生产者节点。 我注意到 Kafka10 添加了有关在发送给生产者的消息上附加时间戳的功能,但我发现只是 java 示例。 如何使用 nodejs 为消息添加时间戳?

下面我报告nodejs中producer是如何实现的:

function kafkaAdvancedNode(config) {
    RED.nodes.createNode(this,config);
    var topic = config.topic;
    var partition = config.partition;
    var clusterZookeeper = config.zkquorum;
    var debug = (config.debug == "debug");
    var node = this;
    var kafka = require('kafka-node');
    var HighLevelProducer = kafka.HighLevelProducer;
    var Client = kafka.Client;
    var topics = config.topics;
    var client = new Client(clusterZookeeper);

    try {
        this.on("input", function(msg) {
            var payloads = [];

            // check if multiple topics
            if (topics.indexOf(",") > -1){
                var topicArry = topics.split(',');

                for (i = 0; i < topicArry.length; i++) {
                    payloads.push({topic: topicArry[i], messages: msg.payload});
                }
            }
            else {
                if(partition == "" || !partition)
                    payloads = [{topic: topics, messages: msg.payload}];
                else
                    payloads = [{topic: topics, messages: msg.payload, partition: partition}];
            }

            producer.send(payloads, function(err, data){
                if (err){
                    node.error(err);
                }
                node.log("Message Sent: " + data);
            });
        });
    }
    catch(e) {
        node.error(e);
    }
    var producer = new HighLevelProducer(client);
    this.status({fill:"green",shape:"dot",text:"connected to "+clusterZookeeper});
}

在谷歌上我发现了这个:link

但实际上我不明白如何在我的代码中集成(什么是 Avro?)

【问题讨论】:

    标签: node.js timestamp apache-kafka node-red kafka-producer-api


    【解决方案1】:

    我认为 kafka-node 不支持 0.10 Kafka 协议,因此它不太可能支持消息时间戳。

    出于这个(和其他)原因,我使用 node-rdkafka 编写了类似的内容并将其发布为 node-red-contrib-rdkafka。见https://github.com/hjespers/node-red-contrib-rdkafka

    更新:我刚刚发布了 node-red-contrib-rdkafka 的新版本 (0.2.0),它支持 0.10“事件时间”或“处理时间”时间戳以及动态主题、键和分区。

    【讨论】:

    • 不幸的是,在树莓派上,我在尝试安装它时收到错误消息。链接->imgur.com/a/idV4U
    • 我明白了,你没有提到这是在树莓派上运行的。这个错误可能是由于用 C 编写的嵌入式 librdkafka 库。我还编写了一个纯 JavaScript node-red-contrib-confluent 模块,用于像 rPI 这样的嵌入式设备。它使用 Confluent REST 代理向 Kafka 发布/订阅。这将自动将时间戳设置为摄取时间。它还可以通过防火墙和互联网更好地工作,因为它在代理之前使用 HTTP,并将 Kafka 协议限制在数据中心/云内部。
    • 对我不好,我的时间戳被封装在 JSON-LD 的传感器数据中
    • 如果 Flink 可以从消息体中提取时间,那么 Kafka 消息的外层封装上的时间戳(如果有的话)就无关紧要了。 Kafka 时间戳用于消息保留和 Kafka Streams 应用程序。但是,我会尝试让 node-red-contrib-rdkafka 在树莓派上编译并增强它以允许手动设置 Kafka 协议时间戳。
    • 更新了答案以反映在 node-red-contrib-rdkafka 中添加的时间戳和键/值消息。我还与 librdkafka 的作者进行了交谈,他认为它应该可以在 Raspberry Pi 上运行,所以我将继续尝试让这个新版本也可以在 rpi 上运行。
    猜你喜欢
    • 2013-09-16
    • 1970-01-01
    • 2021-07-14
    • 2019-04-18
    • 1970-01-01
    • 2020-11-18
    • 2019-08-18
    • 2021-08-17
    • 2013-09-19
    相关资源
    最近更新 更多