【问题标题】:RxJS 6 conditionally pipe an observableRxJS 6 有条件地管道一个 observable
【发布时间】:2018-08-15 13:52:44
【问题描述】:

我有一个现有的函数,它调用 HTTP 端点来处理一些工作,该端点负责管理我的 Angular 组件中的一些逻辑。出现了一个要求,因此在某些情况下,我们需要调用所有现有的逻辑,还需要进行另一个 HTTP 调用。以下是我尝试的注释版本。

public resolveComponent(action: string) {
        //if called with no parameters, don't do anything
        //otherwise, call one of the two endpoints.     
        let preAction = (x: any) => {
            if (action === "replay") {
                return this.remediationService
                           .replayRemediation(this.workflow.Id);
            }
            else if (action === "remove") {
                return this.remediationService
                           .removeRemediation(this.workflow.Id);
            }
            else {
                return of([]);
            }
        }

        this.remediationService
            .resolveComponent(component)
            .pipe(
                //the line below was what I added after new requirement
                mergeMap(preAction),
                mergeMap(x => {
                    //grabs updated data from server
                    return this.remediationService.remediationComponent(component.Id);
                })
            )
            .subscribe((x: SomeModel) => {
                    //logic happens
                });
}

这按预期执行工作,但这是有条件地链接可观察对象的最佳方式吗?

【问题讨论】:

  • 你可以尝试 switchMap 根据条件返回不同的 observables
  • 他不会使用switchmap,因为他正在等待响应,exhaustmap会是更好的选择。回到主题:我认为这是一种像你一样做的合法方式

标签: angular rxjs


【解决方案1】:

使用iifrxjs。

iif 接受一个条件函数和两个 Observable。当

  • 订阅了算子返回的 Observable,将调用条件函数。

https://www.learnrxjs.io/operators/conditional/iif

【讨论】:

  • 我认为这应该被接受为答案。当您只将一个 observable 传递给 iff() 运算符时,iif() 的使用允许您有条件地将一个 observable 包含到您的订阅中。在 rxjs-dev.firebaseapp.com/api/index/function/iif 的“控制对 Observable 的访问”中查看一个清晰的示例
【解决方案2】:

更新 使用新的 iif rxjs 运算符。 https://www.learnrxjs.io/operators/conditional/iif
感谢@moslem-shahsavan 和 Rx 团队)

您可以添加自己的pipeIf:

import { from, Observable, of, NEVER } from 'rxjs';
import { mergeMap, map, filter } from 'rxjs/operators';

from([10, 11, 12, 13])
  .pipe(
    map(x => x + 10),
    pipeIf(x => x > 21,
      filter(x => x > 22),
      map(x => x + 100)
    ),
)
  .subscribe(console.log);

function pipeIf(predicate, ...pipes) {
  return function (source) {
    return source.pipe(
      mergeMap(value => predicate(value) ? of(value).pipe(...pipes) : of(value))
    )
  }
}

// 20
// 21
// 123

【讨论】:

  • 在管道中传播参数存在问题。了解更多:github.com/ReactiveX/rxjs/issues/3989
  • @Buggy:你能添加打字稿类型吗?另外,传播...pipes 两次正确吗?另外,我认为这不适用于基于时间的运算符。
【解决方案3】:

iif 可以有条件地写 observables:

  • iif(() => cond, obs1, obs2) = cond ? obs1 : obs2
  • iif(() => cond, obs1) = cond ? obs1 : EMPTY

iif 不适用于运营商。如果您想在.pipe() 中有条件地编写运算符,请使用:

cond ? delay(1000) : interval(500)

对于空运算符,使用identity:

import { identity } from 'rxjs'
cond ? delay(1000) : identity

identity 实际上是x => x

【讨论】:

    【解决方案4】:

    您可能只使用merge()(或concat(),如果您需要流是连续的)而不是两个mergeMap(),因为您似乎不需要新流的输出作为输入您现有的信息流:

    const replayObservable = of("Has been Replayed");
    const removeObservable = of("Has been Removed");
    const existingObservable = of(
      "Finally, do what you already did before the CR."
    );
    const resolveComponent = (action?: string) => {
      const newSource = of();
      const targetWithoutNull = newSource.pipe(
        merge(
          iif(
            () => !!action,
            iif(() => action == "replay", replayObservable, removeObservable)
          ),
          existingObservable
        )
      );
      targetWithoutNull.subscribe(x => console.log(x));
    };
    
    resolveComponent("replay");
    resolveComponent("remove");
    resolveComponent();
    

    注意第一个iif()只有2个参数。如果条件指向未定义的参数,iif() 将完成流,因此在这种情况下,只会订阅现有的Observable。

    看到这个运行:https://stackblitz.com/edit/rxjs-7ztk16

    【讨论】:

      【解决方案5】:

      这个对我有用,

      declare module "rxjs" {
          interface Observable<T> {
            pipeIf(validExpression:boolean, truePipe:Array<UnaryFunction<T,T>>, falsePipe:Array<UnaryFunction<T,T>>): Observable<T>
          }
        }
      
        Observable.prototype.pipeIf = function(validExpression:boolean, truePipe:Array<OperatorFunction<any,any>>, falsePipe:Array<OperatorFunction<any,any>>):Observable<any>{
          return validExpression ? this.pipe.apply(this,truePipe) : this.pipe.apply(this, falsePipe)
        }
      

      【讨论】:

        猜你喜欢
        • 2021-01-13
        • 1970-01-01
        • 2017-03-09
        • 1970-01-01
        • 1970-01-01
        • 2021-06-25
        • 1970-01-01
        • 2018-11-09
        • 1970-01-01
        相关资源
        最近更新 更多