【问题标题】:Chaining Subjects - RxJS / Angular2链接主题 - RxJS / Angular2
【发布时间】:2017-01-20 08:59:20
【问题描述】:

我对可观察对象非常很陌生,我正在努力思考一种链接主题的好方法。本质上,我正在尝试获取一组将发出相同类型的可观察对象并将它们链接在一起,这样,当我在第一个主题上调用 next 时,每个后续主题(假设两者之间没有 error 发生)有机会获取该值,对其进行操作,并将其传递给下一个主题,直到它到达最后一个主题,这将发出最终结果。

我已经推出了自己的课程来处理这个问题,但似乎这种特殊情况会一直出现可观察对象,所以我想知道是否有人知道 RxJS 中已经内置的任何东西或执行此操作的 Angular2。

另外,我是否试图强迫对象做他们不应该做的事情?有没有更好的方法来将算法链接在一起,这样每个函数都有机会在最终返回之前依次操作输入,如果我需要,可能会出错?在这种情况下,什么被认为是“最佳做法”?

编辑

为了更清楚我在说什么,这就像我正在寻找的东西:

var wrapper = Subject.chain(subject1, subject2, subject3)

// Subscriptions happen here

/** 
 * This calls subject1.next("HI"), 
 * which then calls subject2.next() with the result of subject1's manipulation of "HI",
 * which then calls subject3.next() with the result of subject2's manipulation of the subject1's manipulation of "HI",
 * which then emits the result of subject3's manipulation of subject2's manipulation of subject1's manipulation of "HI"
 */
wrapper.next("HI"); 

解决方法

如果将来有人发现这个问题,这是我在 Array.reduce 函数的帮助下使用的解决方法。它并不完美,但对我有用:

chain<T>(source: Observable<T>, destination: Subject<T>): Observable<T>
{
    let processed = false;
    return source.catch(
        err => {
            let ret = new ReplaySubject<T>(1);
            destination.first().subscribe(ret);
            processed = true;
            destination.error(err);
            return ret;
        }
    ).finally(
        () => {
            // TODO: Allow sources to not propagate complete status
            !processed && destination.complete();
            processed = true;
        }
    ).flatMap(
        (next: T) => {
            let ret = destination;
            if(!processed)
            {
                ret = new ReplaySubject<T>(1);
                destination.first().subscribe(ret);
                destination.next(next);
            }
            processed = false;
            return ret;
        }
    );
}

【问题讨论】:

    标签: angular rxjs observable


    【解决方案1】:

    要链接可观察对象,您应该考虑使用flatMapswitchMap 等运算符。

    这是一个使用 HTTP 的示例:

    this.http.get('...')
        .map(res => res.json())
        .flatMap(data => {
          // receive the result of the first request
          // use this result to execute another one
          return this.http.get('...')
                  .map(res => res.json());
        }).subscribe(data => {
          // receive the result of the second request
        });
    

    如果你对这方面的好教程感兴趣,你可以看看这个:

    【讨论】:

    • 当你有一个任意长度的列表时,有没有办法做到这一点?我应该创建一个递归函数来处理它吗?
    • 您将如何创建您的列表?你想串行还是并行执行?您要汇总结果还是只汇总最后一个?
    • 我想按顺序执行。一旦一个 observable 有结果,它就会将它传递给下一个 observable 以进行操作,依此类推。最后应该有一个值。在我的特殊情况下,结果类型始终是一致的,但我认为不一定需要这样。我希望它是一个结构,我可以从一个地方传递到另一个地方,它的行为就像一个主题,这样,当你发出一些东西时,包装结构会调用底层的主题并发出链的结果。这有意义吗?
    【解决方案2】:

    您可以使用订阅来链接主题。

    subject1
        .subscribe( subject2 );
    
    subject2
        .subscribe( subject3 );
    
    subject3
        .subscribe( x=> {
            // do you want to do
        });
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2016-08-18
      • 1970-01-01
      • 2016-11-27
      • 1970-01-01
      • 2023-03-21
      • 1970-01-01
      • 2016-05-18
      • 2016-12-09
      相关资源
      最近更新 更多