【问题标题】:RxJS5 => subscriber's onCompleted callback not firingRxJS5 => 订阅者的 onCompleted 回调未触发
【发布时间】:2017-01-04 04:09:54
【问题描述】:

我在队列中有这两种方法。我已经实现了某种形式的背压,由此从方法创建的 observables 只会在用户触发回调时触发事件,所有这些都是通过 observables 实现的。问题是我无法让 onCompleted 处理程序在drain() 的主订阅者中触发。令我惊讶的是 onNext 会为同一个订阅者触发,那么为什么 onCompleted 不会触发呢?我认为在 takeUntil 调用和订阅者中的 onCompleted 处理程序会触发的笨重的 $obs.complete() 之间......

Queue.prototype.isEmpty = function (obs) {

    if (!obs) {
        // this is just a dummy observable
        // I wish Rx had Rx.Observable.dummy() alongside
        // Rx.Observable.empty(), but oh well
        obs = Rx.Observable.of('dummy');
    }

    return this.init()
        .flatMap(() => {
            return obs; // when you call obs.next(), it should fire this chain again
        })
        .flatMap(() => {
            return acquireLock(this)
                .flatMap(obj => {
                    return acquireLockRetry(this, obj)
                })
        })
        .flatMap(obj => {
            return findFirstLine(this)
                .flatMap(l => {
                    return releaseLock(this, obj.id)
                        .map(() => {
                            console.log(' => LLLL1 => ', l);
                            return l;
                        });
                });
        })
        .filter(l => {
            // filter out any lines => only fire event if there is no line

            return !l;
        })
        .map(() => {
            //  the queue is now empty
            obs.complete(); // <<<<<<<<<< note this call
            return {isEmpty: true}
        });


};


Queue.prototype.drain = function (obs, opts) {

    opts = opts || {};

    const isConnect = opts.isConnect || false;
    const delay = opts.delay || 500;

    let $obs = obs.takeUntil(this.isEmpty(obs))
        .flatMap(() => {
            return this.init();
        })
        .flatMap(() => {
            return acquireLock(this)
                .flatMap(obj => {
                    return acquireLockRetry(this, obj)
                });
        })
        .flatMap(obj => {
            return removeOneLine(this)
                .flatMap(l => {
                    return releaseLock(this, obj.id)
                        .map(() => l);
                });
        });


    process.nextTick(function(){
        obs.next('foo foo foo');
        $obs.next('bar bar bar');
        $obs.complete();
    });


    return $obs;

};

什么是绝对的疯狂,是我不能让 onCompleted 回调触发,当我这样调用上面的:

const q = new Queue();

const obs = new Rx.Subject();

q.drain(obs).subscribe(

    function (v) {

        console.log('end result => ', colors.yellow(util.inspect(v)));

        setTimeout(function () {
            // the following call serves as the callback which will fire the observables in the methods again
            obs.next();
        }, 100);

    },
    function (e) {
        console.log('on error => ', e);
    },
    function (c) {
        // this never gets called and it is driving me f*cking crazy
        console.log(colors.red(' DRAIN on completed => '), c);
    }

);

obs.subscribe(
    function (v) {
        console.log('next item that was drained => ', v);
    },
    function (e) {
        console.log('on error => ', e);
    },
    function (c) {
        // this gets called!
        console.log(colors.red(' => obs on completed => '), c);
    }
);

当我调用上述内容时,我得到了这个:

next item that was drained =>  foo foo foo
next item that was drained =>  bar bar bar
 => obs on completed =>  undefined

我之所以只得到这 3 行,是因为我这样做:

process.nextTick(function(){
    obs.next('foo foo foo');
    $obs.next('bar bar bar');
    $obs.complete();
}); 

但是为什么不会显式调用$obs.complete();触发这个回调:

 function (c) {
            // this never gets called and it is driving me f*cking crazy
            console.log(colors.red(' DRAIN on completed => '), c);
        }

?

【问题讨论】:

    标签: javascript node.js rxjs5


    【解决方案1】:

    好吧,我想我明白了,这个 RxJS 是多么疯狂的库

    最有可能做正确的事情,你应该使用 take() 或 takeUntil() 或类似的

    所以我这样做了:

    Queue.prototype.drain = function (obs, opts) {
    
        if (!(obs instanceof Rx.Observable)) {
            opts = obs || {};
            obs = new Rx.Subject();
        }
        else {
            opts = opts || {};
        }
    
    
        const isConnect = opts.isConnect || false;
        const delay = opts.delay || 500;
    
        process.nextTick(function () {
            obs.next();
        });
    
    
        let $obs = obs
            .flatMap(() => {
                return this.init();
            })
            .flatMap(() => {
                return acquireLock(this)
                    .flatMap(obj => {
                        return acquireLockRetry(this, obj)
                    });
            })
            .flatMap(obj => {
                return removeOneLine(this)
                    .flatMap(l => {
                        return releaseLock(this, obj.id)
                            .map(() => ({data: l, cb: obs.next.bind(obs)}));
                    });
            })
            //  here is the key part!
            .takeUntil(this.isEmpty(obs));
    
    
        return $obs;
    
    };
    

    这似乎成功了。有一段时间我很绝望。如果您想进一步了解其工作原理,请在内部咨询。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2017-06-16
      • 1970-01-01
      • 2010-12-21
      • 1970-01-01
      • 1970-01-01
      • 2018-03-21
      • 1970-01-01
      • 2019-12-30
      相关资源
      最近更新 更多