【发布时间】:2017-01-04 04:09:54
【问题描述】:
我在队列中有这两种方法。我已经实现了某种形式的背压,由此从方法创建的 observables 只会在用户触发回调时触发事件,所有这些都是通过 observables 实现的。问题是我无法让 onCompleted 处理程序在drain() 的主订阅者中触发。令我惊讶的是 onNext 会为同一个订阅者触发,那么为什么 onCompleted 不会触发呢?我认为在 takeUntil 调用和订阅者中的 onCompleted 处理程序会触发的笨重的 $obs.complete() 之间......
Queue.prototype.isEmpty = function (obs) {
if (!obs) {
// this is just a dummy observable
// I wish Rx had Rx.Observable.dummy() alongside
// Rx.Observable.empty(), but oh well
obs = Rx.Observable.of('dummy');
}
return this.init()
.flatMap(() => {
return obs; // when you call obs.next(), it should fire this chain again
})
.flatMap(() => {
return acquireLock(this)
.flatMap(obj => {
return acquireLockRetry(this, obj)
})
})
.flatMap(obj => {
return findFirstLine(this)
.flatMap(l => {
return releaseLock(this, obj.id)
.map(() => {
console.log(' => LLLL1 => ', l);
return l;
});
});
})
.filter(l => {
// filter out any lines => only fire event if there is no line
return !l;
})
.map(() => {
// the queue is now empty
obs.complete(); // <<<<<<<<<< note this call
return {isEmpty: true}
});
};
Queue.prototype.drain = function (obs, opts) {
opts = opts || {};
const isConnect = opts.isConnect || false;
const delay = opts.delay || 500;
let $obs = obs.takeUntil(this.isEmpty(obs))
.flatMap(() => {
return this.init();
})
.flatMap(() => {
return acquireLock(this)
.flatMap(obj => {
return acquireLockRetry(this, obj)
});
})
.flatMap(obj => {
return removeOneLine(this)
.flatMap(l => {
return releaseLock(this, obj.id)
.map(() => l);
});
});
process.nextTick(function(){
obs.next('foo foo foo');
$obs.next('bar bar bar');
$obs.complete();
});
return $obs;
};
什么是绝对的疯狂,是我不能让 onCompleted 回调触发,当我这样调用上面的:
const q = new Queue();
const obs = new Rx.Subject();
q.drain(obs).subscribe(
function (v) {
console.log('end result => ', colors.yellow(util.inspect(v)));
setTimeout(function () {
// the following call serves as the callback which will fire the observables in the methods again
obs.next();
}, 100);
},
function (e) {
console.log('on error => ', e);
},
function (c) {
// this never gets called and it is driving me f*cking crazy
console.log(colors.red(' DRAIN on completed => '), c);
}
);
obs.subscribe(
function (v) {
console.log('next item that was drained => ', v);
},
function (e) {
console.log('on error => ', e);
},
function (c) {
// this gets called!
console.log(colors.red(' => obs on completed => '), c);
}
);
当我调用上述内容时,我得到了这个:
next item that was drained => foo foo foo
next item that was drained => bar bar bar
=> obs on completed => undefined
我之所以只得到这 3 行,是因为我这样做:
process.nextTick(function(){
obs.next('foo foo foo');
$obs.next('bar bar bar');
$obs.complete();
});
但是为什么不会显式调用$obs.complete();触发这个回调:
function (c) {
// this never gets called and it is driving me f*cking crazy
console.log(colors.red(' DRAIN on completed => '), c);
}
?
【问题讨论】:
标签: javascript node.js rxjs5