【问题标题】:How to get events from one stream which happened after the last event from other stream如何从另一个流的最后一个事件之后发生的一个流中获取事件
【发布时间】:2016-07-27 11:12:02
【问题描述】:
有两个流,永远不会完成:
--a---b-c-d---e--f---->
-----1-------2------3-->
我想从第二个流的最后一个事件之后发生的第一个流中获取事件。
一开始是这样的:
--a->
(?)
---->
(=)
--a->
在第二个流发出第一个事件后,它看起来像这样:
--a---b-c-d->
(?)
-----1------>
(=)
------b-c-d->
在第二个流中的新事件之后:
--a---b-c-d---e--f->
(?)
-----1-------2----->
(=)
--------------e--f->
等等……需要哪一组操作员来做这件事?
【问题讨论】:
标签:
javascript
angular
reactive-programming
rxjs
rxjs5
【解决方案1】:
你可以使用CombineLatest来返回你想要的事件,例如:
/* Have staggering intervals */
var source1 = Rx.Observable.interval(1000)
.map(function(i) {
return {
data: (i + 1),
time: new Date()
};
});
var source2 = Rx.Observable.interval(3000).startWith(-1)
.map(function(i) {
return {
data: (i + 1),
time: new Date()
};
});
// Combine latest of source1 and source2 whenever either gives a value with a selector
var source = Rx.Observable.combineLatest(
source1,
source2,
function(s1, s2) {
if (s1.time > s2.time) {
return s1.data + ', ' + s2.data;
}
return undefined;
}
).filter(x => x !== undefined).take(10);
var subscription = source.subscribe(
function(x) {
console.log('Next: %s', x);
},
function(err) {
console.log('Error: %s', err);
},
function() {
console.log('Completed');
});
【解决方案2】:
我不是 100% 支持你的弹珠,因为它们看起来有些矛盾,但我会提出一个替代方案,即使用 window 或 buffer 如果你只需要将事件分组在一起第二个来源。
var count = 0;
//Converts the source into an Observable of Observable windows
source1.window(source2).subscribe(w => {
var local = count++;
//Just to show that this is indeed an Observable
//Subscribe to the inner Observable
w.subscribe(x => console.log(local + ': ' + x));
});
请注意,这只会发出来自第一个来源的值,如果您希望它们与其他来源的值结合使用,那么请务必按照其他答案的建议使用 combineLatest 或 withLatestFrom。