【问题标题】:Returning error early from a function that returns an observable从返回可观察对象的函数中提前返回错误
【发布时间】:2017-09-07 07:55:09
【问题描述】:

我有一个函数和一个验证辅助函数,可以将 tsv 文件作为字符串读取。我正在尝试通过使用 rxjs 执行同步任务来练习我的反应式编程。我知道这可能不是它的完美用例,但我知道它应该是可能的。

这是最简单形式的伪代码:

public readTsv(tsv: string): Observable<TsvAsObject> {
  if (!tsv || tsv.length < 1) {
    return Observable.throw('null or empty tsv file passed');
  }
  // check tsv headers
  this.validHeadersObs(tsv.split('\n')[0]) // pass the first line of the tsv string
    .filter(valid => valid === false)
    .subscribe(() => {
      return Observable.throw('invalid tsv headers')
    });

  // omitted logic that processes the tsv and then returns it as an observable

  return Observable.of(tsvAsObject);
}

我已经测试了我的 validHeaders() 函数,我知道它可以工作。它返回检查每个列标题的布尔值流。一旦一对头文件不匹配,它就会返回一个 Observable.of(false):

private validHeadersObs(headerLine: string): Observable<boolean> {
  const headers$ = Observable.from(headerLine.split('\t'));
  const validHeaders = ['columnName1', 'columnName2', 'columnName3'];

  return headers$
    .mergeMap((value, index) => Observable.of(value === validHeaders[index]));
}

问题是 readTsv() 不返回 Observable.throw('invalid tsv headers') 因为我不确定如何从订阅中的函数提前返回。有没有办法从箭头函数内部“双重返回”?

【问题讨论】:

    标签: error-handling rxjs reactive-programming


    【解决方案1】:

    好的,看看你想要做什么,我看到了一个危险信号:在一个将返回一个可观察对象的函数中,可能不应该有一个 subscribe 调用 除非 它在另一个 Observable 的内部。

    你要做的是:

    1. 尽快抛出参数验证错误。如果您选择,这使您能够同步处理它,这也意味着如果您在 RxJS 中使用它(例如,在 switchMap 中),它仍然会沿着 observable 的错误通道传播。
    2. 将您的validHeadersObs 呼叫链接到您要执行的下一个操作。这实际上与你需要对承诺做的事情没有什么不同(认为validate.then(returnAthing)validate.map(returnAthing) 在这种情况下并没有太大不同)。

    您当前代码中的问题是您开始这项工作的是 订阅,它是异步的,所以它不会返回,然后你继续 返回可观察到的“快乐路径”。如果返回“幸福路径” observable 取决于您的验证调用的异步反馈, 你需要把它锁起来。

    (小记,如果任何 tsv 处理逻辑碰巧是异步的,你会想要使用 switchMap 而不是 map

    应该这样做:

    public readTsv(tsv: string): Observable<TsvAsObject> {
      if (!tsv || tsv.length < 1) {
        // throw an error as soon as you can
        throw new TypeError('null or empty tsv file passed');
      }
    
      // check tsv headers
      return this.validHeadersObs(tsv.split('\n')[0]) 
        .map(valid => {
          if (!valid) {
            throw new Error('invalid tsv headers');
          }
    
          // omitted logic that processes the tsv here
    
          return tsvAsObject
        });
    }
    

    【讨论】:

      【解决方案2】:

      我建议如下:

      保持非异步函数简单没有Rx逻辑

      如果您尝试在 Rx 异步流中执行所有操作,这将增加认知超载并分散您想要解决的实际问题的注意力。所以即使你想练习 Rx,也不要过度。

      private allHeadersAreValid(headerLine: string): boolean {
        const validHeadersInOrder = ['columnName1', 'columnName2', 'columnName3'];
      
        return headerLine.split('\t')
          .map((field, index) => field === validHeaders[index])
          .reduce((acc, curr) => acc && curr, true);
      }
      

      没有subscribe()内部函数,只有Observables

      通过在函数内部订阅,您可以放松 Observables 的惰性,并且已经在执行可能永远不需要的代码。因为我们现在已将验证设置为同步功能,您可以轻松地对其进行测试,并在实际工作之前使用它来测试先决条件。

      public readTsv(tsv: string): Observable<TsvAsObject> {
        if (!tsv || tsv.length < 1) {
          return Observable.throw('null or empty tsv file passed');
        }
      
        if(!allHeadersAreValid(tsv)) {
          return Observable.throw('invalid tsv headers');
        }
      
        // omitted logic that processes the tsv and then returns it as an observable
      
        return Observable.of(tsvAsObject);
      }
      

      上述代码示例的缺点是您仍然在实际 subscribing 之前工作,因为 allHeadersAreValid 已经被评估。为了让一切再次变得懒惰,您可以使用Defer() 将代码的执行推迟到实际订阅时刻。

      public readTsv(tsv: string): Observable<TsvAsObject> {
        return Observable.defer(() => {
          if (!tsv || tsv.length < 1) {
            return Observable.throw('null or empty tsv file passed');
          }
      
          if(!allHeadersAreValid(tsv)) {
            return Observable.throw('invalid tsv headers');
          }
      
          // omitted logic that processes the tsv and then returns it as an observable
      
          return Observable.of(tsvAsObject);
        });
      }
      

      现在您的验证函数是同步的并且易于测试,但您的 readTsv 函数仍然是 Observable 并且像预期的那样是惰性的。

      【讨论】:

      • 对于我写的答案,它是否与您使用Observable.defer() 的解决方案一样“懒惰”?
      【解决方案3】:

      感谢 this 关于 Angular http 服务的问题,我通过使用 Observable.create() 创建自定义 Observable 解决了这个问题。我认为这被称为使用更高阶的 observable?无论如何...

      public buildMenusFromTsv(tsv: string): Observable<any> {
        return Observable.create((observer: Observer<any>) => {
          Observable.of((tsv && tsv.length > 0) as boolean)
            .filter(valid => valid === false)
            .map(() => Observable.throw('null or empty tsv file passed')
            .subscribe(err => observer.error(err));
      
          // check tsv headers
          this.validHeadersObs(tsv.split('\n')[0])
            .filter(valid => valid === false)
            .map(() => Observable.throw('invalid tsv headers')
            .subscribe(err => observer.error(err));
      
          // omitted logic that processes the tsv and then returns it as an observable
      
          observer.next(Observable.of(tsvAsObject));
          observer.complete();
        });
      }
      

      奖励:我找到了一个时髦的运算符来比较可观察的流,而不是笨拙的 mergeMap()

      private validHeadersObs(headerLine: string): Observable<boolean> {
        const headers$ = Observable.from(headerLine.split('\t'));
        const validHeaders$ = Observable.from(['columnName1', 'columnName2', 'columnName3']);
      
        return headers$.sequenceEqual(validHeaders$);
      }
      

      【讨论】:

        猜你喜欢
        • 2018-04-11
        • 2018-12-16
        • 2018-02-01
        • 2017-07-12
        • 2020-12-10
        • 1970-01-01
        • 2019-02-18
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多