【问题标题】:Mongodb Tailable Cursor in nodejs, how to stop streamnodejs中的Mongodb Tailable Cursor,如何停止流
【发布时间】:2014-11-12 08:03:44
【问题描述】:

我使用下面的代码从 mongodb capped collection 中获取数据

函数监听(条件,回调){

        db.openConnectionsNew( [req.session.client_config.db] , function(err, conn){

            if(err) {console.log({err:err}); return next(err);}

            coll = db.opened[db_name].collection('messages');

        latestCursor = coll.find(conditions).sort({$natural: -1}).limit(1)
        latestCursor.nextObject(function(err, latest) {
            if (latest) {
                conditions._id = {$gt: latest._id}
            }
            options = {
                tailable: true,
                awaitdata: true,
                numberOfRetries: -1
            }
            stream = coll.find(conditions, options).sort({$natural: -1}).stream()
            stream.on('data', callback)
        });

        });
    }

然后我使用 sockets.broadcast(roomName,'data',document);

在客户端

io.socket.get('/get_messages/', function(resp){
});
io.socket.on('data', function notificationReceivedFromServer ( data ) {
  console.log(data);

});

这非常有效,因为我可以看到插入到 db 中的任何新文档。

我可以在 mongod -verbose 中看到,每 200 毫秒后会有一个使用查询 {$gt:latest_id} 运行的查询,这很好,但我不知道如何关闭这个查询 :( 我很新nodejs 和第一次使用 mongodb tailable 选项,我完全迷路了,非常感谢任何帮助或线索

【问题讨论】:

    标签: javascript node.js mongodb mongodb-query


    【解决方案1】:

    Cursor 对象从.find() 返回的.stream() 方法返回的是node transform stream 接口的实现。具体来说,这是一个“可读”流。

    因此,只要接收到新数据并在要读取的流中可用,就会发出它的“数据”事件。

    还有其他方法,例如.pause().resume(),可用于控制这些事件的流程。通常,您会将这些“内部”称为“数据”事件回调,您希望确保在处理“下一个”数据事件之前执行该回调中的代码:

    stream.on("data", function(data) {
       // pause before processing
       stream.pause();
    
      // do some work, possibly with a callback
      something(function(err,result) {
    
          // Then resume when done
          stream.resume();
      });
    });
    

    但这当然只是“范围界定”的问题。因此,只要“流”变量定义在另一段代码可以访问它的范围内,那么您可以随时调用任一方法。

    同样,通过范围界定,您可以在代码中的任何位置“取消定义”“流”对象,从而使“事件处理”变得多余。

    // Just overwrite the object
    scope = undefined;
    

    所以值得知道。事实上,较新的“2.x 版”节点驱动程序将“流接口”直接包装到标准的Cursor 对象中,而无需调用.stream() 进行转换。节点流是非常有用和强大的东西,在接受它们的使用时非常值得。

    【讨论】:

    • 非常感谢尼尔 :) 终于我可以停止直播了。只剩下一件事了,流的数据事件只有在集合中添加记录时才会被激活:(这意味着如果没有在数据库中添加记录,那么我没有可以关闭流的事件..但是你确实回答了我关于如何停止流的问题,所以再次感谢
    • @Arian 您在这里的解释不正确。 “流”只是定义事件侦听器的对象。再次阅读并理解“范围”。只要该对象是范围内的或可以从其他地方访问,那么您可以“暂停”或只是取消定义该对象,这样就不会发生其他任何事情。
    • 我将写下我如何检查的步骤 1. 我运行我的代码 2. mongod 正在使用 vrebose 选项运行,所以我能够看到我已经流式传输的 monodb 查询 3. 我再次更新我的代码并添加了一个函数 function clearStream(){ setTimeout(function(){ stream.paused=true; }, 10000);如果我把这个函数放在 on('data') 事件中它可以工作,但是如果我只是在外面调用这个函数......代码确实被执行但没有任何反应,我能够看到查询仍在 mongod 控制台中运行:(
    • 好吧,你是对的,该功能可以在任何地方工作,但只有在数据事件中接收到任何数据时,我仍然需要弄清楚即使没有收到数据,我如何才能使这项工作,所以我可以停止在后台运行的流查询
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2015-03-13
    • 1970-01-01
    • 1970-01-01
    • 2020-07-05
    • 1970-01-01
    • 1970-01-01
    • 2020-12-30
    相关资源
    最近更新 更多