【发布时间】:2017-01-20 08:59:20
【问题描述】:
我对可观察对象非常很陌生,我正在努力思考一种链接主题的好方法。本质上,我正在尝试获取一组将发出相同类型的可观察对象并将它们链接在一起,这样,当我在第一个主题上调用 next 时,每个后续主题(假设两者之间没有 error 发生)有机会获取该值,对其进行操作,并将其传递给下一个主题,直到它到达最后一个主题,这将发出最终结果。
我已经推出了自己的课程来处理这个问题,但似乎这种特殊情况会一直出现可观察对象,所以我想知道是否有人知道 RxJS 中已经内置的任何东西或执行此操作的 Angular2。
另外,我是否试图强迫对象做他们不应该做的事情?有没有更好的方法来将算法链接在一起,这样每个函数都有机会在最终返回之前依次操作输入,如果我需要,可能会出错?在这种情况下,什么被认为是“最佳做法”?
编辑
为了更清楚我在说什么,这就像我正在寻找的东西:
var wrapper = Subject.chain(subject1, subject2, subject3)
// Subscriptions happen here
/**
* This calls subject1.next("HI"),
* which then calls subject2.next() with the result of subject1's manipulation of "HI",
* which then calls subject3.next() with the result of subject2's manipulation of the subject1's manipulation of "HI",
* which then emits the result of subject3's manipulation of subject2's manipulation of subject1's manipulation of "HI"
*/
wrapper.next("HI");
解决方法
如果将来有人发现这个问题,这是我在 Array.reduce 函数的帮助下使用的解决方法。它并不完美,但对我有用:
chain<T>(source: Observable<T>, destination: Subject<T>): Observable<T>
{
let processed = false;
return source.catch(
err => {
let ret = new ReplaySubject<T>(1);
destination.first().subscribe(ret);
processed = true;
destination.error(err);
return ret;
}
).finally(
() => {
// TODO: Allow sources to not propagate complete status
!processed && destination.complete();
processed = true;
}
).flatMap(
(next: T) => {
let ret = destination;
if(!processed)
{
ret = new ReplaySubject<T>(1);
destination.first().subscribe(ret);
destination.next(next);
}
processed = false;
return ret;
}
);
}
【问题讨论】:
标签: angular rxjs observable