【问题标题】:Node Redis XREAD blocking subscription节点 Redis XREAD 阻塞订阅
【发布时间】:2020-09-22 13:17:33
【问题描述】:

我正在将一个 redis 发布/订阅系统转换为 redis 流,这样我就可以为我的服务器发送的事件添加一些容错能力。

订阅传统方式很简单:

import { createClient } from 'redis';

const redisOptions = {
  url: `${process.env.REDIS_URL}/0`
}
const redis = createClient(redisOptions);
redis.setMaxListeners(100000);

redis.on("message", (channel, message) => {
  console.log(channel);
  console.log(message);
});

redis.subscribe('foo');

这将永久阻塞,并保持连接打开。在这种情况下,发布到 redis 将添加到您的日志中。

const json = { a: 1, b: 2 };

redis.publish('foo', JSON.stringify(json));

切换到流,使用XREAD 代替订阅,使用XADD 代替发布,数据大不相同。我正在努力解决的部分是阻塞。

redis.xread('BLOCK', 0, 'STREAMS', 'foo', '$', (err, str) => {
  if (err) return console.error('Error reading from stream:', err);

  str.forEach(message => {
    console.log(message);
  });
}

发送消息时,我的“订阅”会接收第一个消息,但不会记录更多消息。

【问题讨论】:

    标签: node.js redis publish-subscribe redis-streams


    【解决方案1】:

    说实话,我只问这个问题是因为 google 对我不好,而且我找不到其他人发布关于这个问题的帖子。希望这会有所帮助!

    所以,XREAD 只是阻塞了初始调用。它会坐下来等待一段设定的时间(或者如果您将时间设置为 0 则无限期地等待),但是一旦它接收到数据,它的职责就被认为已完成,并且它会解除阻塞。要保持“订阅”处于活动状态,您需要再次调用 XREAD,并使用流中的最新 id。这将替换我们传递给它的初始 $ 值。

    递归似乎是一个完美的解决方案:

    const xread = ({ stream, id }) => {
      redis.xread('BLOCK', 0, 'STREAMS', stream, id, (err, str) => {
        if (err) return console.error('Error reading from stream:', err);
    
        str[0][1].forEach(message => {
          id = message[0];
          console.log(id);
          console.log(message[1]);
        });
    
        setTimeout(() => xread({ stream, id }), 0)
      });
    }
    
    xread({ stream: 'asdf', id: '$' })
    
    

    【讨论】:

    • 令人惊讶的是,网络上几乎没有任何关于此的信息......你确定递归是一个完美的解决方案吗?
    • 通过该递归,您将达到最大调用堆栈深度。用setTimeout(( ) => xread(stream, id), 0) 包装递归调用以允许堆栈清除
    • @W2a - 我不知道...如果你有更好的建议,我很想听听!
    猜你喜欢
    • 1970-01-01
    • 2013-08-09
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-01-08
    • 1970-01-01
    • 2021-10-18
    相关资源
    最近更新 更多