【问题标题】:How to return an observable.fromEvent from a mongo cursor query?如何从 mongo 游标查询中返回 observable.fromEvent?
【发布时间】:2018-01-16 18:38:20
【问题描述】:

我有一个函数,它执行查询并从查询返回的光标事件中返回一个 observable:

exports.query_tokens = (db) => {
  var req = db.collection('collectionName').find({});
  return Rx.Observable.fromEvent(req, 'data');
}

我是这样使用它的:

...
do(mongo_functions.query_tokens).
subscribe(console.log);

但我在控制台中得到了这个:

Db {
nodejs                    |   domain: null,
nodejs                    |   _events: {},
nodejs                    |   _eventsCount: 0,
nodejs                    |   _maxListeners: undefined,
nodejs                    |   s: 
nodejs                    |    { databaseName: 'myDatabase',
nodejs                    |      dbCache: {},
nodejs                    |      children: [],
nodejs                    |      topology: 
nodejs                    |       Server {
nodejs                    |         domain:
...

如您所见,它们不是我的文件。我做错了什么?

如您所见,Curso 实际上触发了一个名为 data 的事件:http://mongodb.github.io/node-mongodb-native/3.0/api/Cursor.html#event:data

【问题讨论】:

    标签: node.js mongodb rxjs reactivex


    【解决方案1】:

    我发现使用下面的代码更方便

    export function findObs(collection: Collection<any>, queryConditions?: any) {
        const queryObj = queryConditions ? queryConditions : {};
        const queryCursor = collection.find(queryObj);
        return Observable.create((observer: Observer<Array<ObjectID>>): TeardownLogic => {
                                queryCursor.forEach(
                                    doc => observer.next(doc),
                                    () => observer.complete()
                                )
                            })
    }
    

    原因是Observable.from方法忽略了光标的"complete"事件,因此永远无法进入订阅者的"onComplete"函数.

    另一方面,使用Observable.create方法可以控制光标的完成,因此也可以触发订阅者的"onComplete"函数。

    【讨论】:

    • 您的回答效果很好而且很简单。我添加了对 MongoError 的检查,而不是简单地完成。
    【解决方案2】:

    do 操作符接收 observable 的 nexterrorcomplete 通知,但对 observable 没有影响。也就是说,从do 运算符的next 函数返回的任何值都将被忽略。因此,传递给subscribe 的函数会收到Db

    您很可能希望使用switchMap 而不是do,将可观察的事件展平到可观察的流中:

    ...
    .switchMap(mongo_functions.query_tokens)
    .subscribe(console.log);
    

    【讨论】:

    • 谢谢,它成功了。我认为至少订阅应该打印 Observable 对象。为什么没有?
    • 因为do的返回值被忽略了。如果您使用了map,则将打印事件可观察对象。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-05-16
    • 1970-01-01
    相关资源
    最近更新 更多