【问题标题】:kafka consumer and async handlerkafka 消费者和异步处理程序
【发布时间】:2019-01-20 05:40:45
【问题描述】:

我有一个带有 Kafka 订阅者的 node.js 应用程序。 订阅处理程序使用“fetch”来调用远程 REST API(等待 fetch(...))。

我尝试处理高频率的消息,REST 调用因远程服务器过载而失败。

发生过载是因为订阅者处理程序是异步的。

我的问题是: 有没有办法确保异步处理程序被序列化,因此不会同时调用远程 API 服务器?

克里斯: 我正在使用 kafka 节点

这是一个代码示例:

const consumer = new Consumer(this.client, [{ topic: topicKey}]);
consumer.on('message', function (message) {
  handleMessage(message)
});

async function handleMessage(message) {
   ... decode the message

  // Send to the Remote server using a REST call

  //=> the task is suspended, waiting for the IO, so, meantime, the next message
  //   is processed, and I flood the remote server of POST requests.
  await fetch(...);
}

谢谢。

【问题讨论】:

  • 你用的是什么客户端?您能否提供一些代码来说明您的问题? stackoverflow.com/help/how-to-ask
  • 嗨,克里斯。我正在使用 kafka 节点。
  • 我只是在问题中添加代码。
  • 也许你应该使用队列?如果你想要一个例子,我可以用它来回答
  • 您的 REST API 是否提供将数据批量发送到端点的选项?如果是这种情况,我会在一个列表中批处理并定期发送该批处理。

标签: node.js apache-kafka kafka-consumer-api


【解决方案1】:

我不确定你想要实现什么。我了解您的 API 已超载,因为您同时调用它太多。

所以,如果我的理解不错,你想同步做。

正如我在评论中所说,我认为队列是一个不错的选择。这是我的做法(您可能会找到一种更好的方法来在其他地方实现队列,但我只是给您一个想法:D)

const consumer = new Consumer(this.client, [{ topic: topicKey}]);
const myQueue = [];

consumer.on('message', function (message) {
    myQueue.push(message);
});

async function consumeQueue(){
    const message = myQueue.shift();

    if(!message){
        await sleep(3000);
    } else {
        // ... decode your message
        await fetch(message)
    }

    consumeQueue();
}

function sleep(ms){
    return new Promise(resolve => setTimeout(resolve, ms));
}

// you have to init it :D
consumeQueue();

【讨论】:

  • 谢谢,我做了一些接近你的解决方案的事情。它有效,但我仍然不清楚如何使用异步消费者限制 Kafka 的消息流。我还将尝试使用 topic.pause 和 topic.resume 来尝试控制消息流。感谢您的帮助。
【解决方案2】:

如果您只想暂停“流程”,直到执行异步代码。以下方法可以很好地工作,因为可读流(ConsumerGroupStream)可以暂停并再次恢复。

const kafka = require('kafka-node') 常量选项 = { kafkaHost: '127.0.0.1:9092', groupId:'组' }; const consumerGroupStream = new kafka.ConsumerGroupStream(options, ['queue']); 消费者组流 .on('数据', (按摩) => { consumerGroupStream.pause(); console.log('消息:',按摩,Date.now()); asyncFunction().then(() => { console.log('现在数据将再次开始流动。', Date.now()); consumerGroupStream.resume(); }); });

第二个选项是使用转换流 https://nodejs.org/api/stream.html#stream_class_stream_transform

const kafka = require('kafka-node') 常量选项 = { kafkaHost: '127.0.0.1:9092', groupId:'组' }; const consumerGroupStream = new kafka.ConsumerGroupStream(options, ['queue']); 异步函数 asyncFunction(消息){ console.log('消息:',消息); } const Transform = require('stream').Transform 常量 messageTransform = new Transform({ 对象模式:真, 解码字符串:真, 转换(消息,编码,完成){ asyncFunction(消息).then(() => { 完毕() }) } }) 消费者组流 .pipe(消息转换)

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2019-01-24
    • 1970-01-01
    • 1970-01-01
    • 2021-04-30
    • 1970-01-01
    • 2019-11-25
    • 2021-09-22
    • 2019-12-31
    相关资源
    最近更新 更多