【问题标题】:RxJs catch error and continueRxJs 捕获错误并继续
【发布时间】:2016-07-29 02:31:09
【问题描述】:

我有一个要解析的项目列表,但其中一个项目的解析可能会失败。

捕捉错误但继续执行序列的“Rx-Way”是什么

代码示例:

var observable = Rx.Observable.from([0,1,2,3,4,5])
.map(
  function(value){
      if(value == 3){
        throw new Error("Value cannot be 3");
      }
    return value;
  });

observable.subscribe(
  function(value){
  console.log("onNext " + value);
  },
  function(error){
    console.log("Error: " + error.message);
  },
  function(){
    console.log("Completed!");
  });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/4.0.6/rx.all.js"></script>

我想以非 Rx 方式做的事情:

var items = [0,1,2,3,4,5];

for (var item in items){
  try{
    if(item == 3){
      throw new Error("Value cannot be 3");
    }
    console.log(item);
  }catch(error){
     console.log("Error: " + error.message);
  }
}

提前致谢

【问题讨论】:

  • 参见 Rx.Observable.onErrorResumeNext
  • 看到了,但不能让它这样工作……你能写一个例子吗?
  • 请参阅Ben Leshs talk about this,尤其是在 23:43。

标签: javascript rxjs reactivex


【解决方案1】:

我建议你改用flatMap(现在是 rxjs 版本 5 中的mergeMap),如果你不关心它们,它会让你折叠错误。实际上,您将创建一个内部 Observable,如果发生错误,可以将其吞下。这种方法的优点是您可以将运算符链接在一起,如果管道中的任何地方发生错误,它将自动转发到 catch 块。

const {from, iif, throwError, of, EMPTY} = rxjs;
const {map, flatMap, catchError} = rxjs.operators;

// A helper method to let us create arbitrary operators
const {pipe} = rxjs;

// Create an operator that will catch and squash errors
// This returns a function of the shape of Observable<T> => Observable<R>
const mapAndContinueOnError = pipe(
  //This will get skipped if upstream throws an error
  map(v => v * 2),
  catchError(err => {
    console.log("Caught Error, continuing")
    //Return an empty Observable which gets collapsed in the output
    return EMPTY;
  })
)

const observable = from([0, 1, 2, 3, 4, 5]).pipe(
  flatMap((value) => 
    iif(() => value != 3, 
      of(value), 
      throwError(new Error("Value cannot be 3"))
    ).pipe(mapAndContinueOnError)
  )
);

observable.subscribe(
  (value) => console.log("onNext " + value), (error) => console.log("Error: " + error.message), () => console.log("Completed!")
);
&lt;script src="https://unpkg.com/rxjs@7.0.0/dist/bundles/rxjs.umd.min.js"&gt;&lt;/script&gt;

【讨论】:

  • 如果在flatMap之后的某处发生错误怎么办?
  • @nonybrighto 然后它将终止流并向订阅者发出错误,除非下游某处有另一个捕获。
  • 这似乎不适用于 RxJS v6。我尝试迁移代码,并且在捕获错误后完成了 observable。您介意更新您的解决方案吗?我在这里创建了一个 stackblitz 供参考。 Stackblitz reproducable code.
  • @ManishShrestha 你去!
  • 喜欢这个答案 - 裸 pipe 函数、工作 rxJs 示例和 const 解构导入。每天都是上学日! - 哦,对于iif
【解决方案2】:

您需要切换到一个新的一次性流,如果其中发生错误将被安全地释放,并保持原始流处于活动状态:

Rx.Observable.from([0,1,2,3,4,5])
    .switchMap(value => {

        // This is the disposable stream!
        // Errors can safely occur in here without killing the original stream

        return Rx.Observable.of(value)
            .map(value => {
                if (value === 3) {
                    throw new Error('Value cannot be 3');
                }
                return value;
            })
            .catch(error => {
                // You can do some fancy stuff here with errors if you like
                // Below we are just returning the error object to the outer stream
                return Rx.Observable.of(error);
            });

    })
    .map(value => {
        if (value instanceof Error) {
            // Maybe do some error handling here
            return `Error: ${value.message}`;
        }
        return value;
    })
    .subscribe(
      (x => console.log('Success', x)),
      (x => console.log('Error', x)),
      (() => console.log('Complete'))
    );
&lt;script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.4.1/Rx.min.js"&gt;&lt;/script&gt;

此博客文章中有关此技术的更多信息:The Quest for Meatballs: Continue RxJS Streams When Errors Occur

【讨论】:

  • 使用 lettable 和 pipe 操作符会怎样?
  • 小心,如果内部 observable 需要一些时间来发射它可能永远不会在使用 switchMap() 时发射,因为它取消了之前的 observable。要自己测试它,请将此答案的Rx.Observable.of(value) 替换为Rx.Observable.from(new Promise((resolve, reject) =&gt; setTimeout(() =&gt; resolve(i))))。结果只会是:“成功 5,完成”。将switchMap() 替换为mergeMap() 可以解决问题并正常给出预期结果。
【解决方案3】:

为了防止您的endlessObservable$ 死亡,您可以将您的failingObservable$ 放在一个高阶映射运算符中(例如switchMapconcatMapexhaustMap...)并通过终止带有empty() observable 的内部流不返回任何值。

使用 RxJS 6:

endlessObservable$
    .pipe(
        switchMap(() => failingObservable$
            .pipe(
                catchError((err) => {
                    console.error(err);
                    return EMPTY;
                })
            )
        )
    );

【讨论】:

【解决方案4】:

您实际上可以在 ma​​p 函数中使用 try/catch 来处理错误。这是代码sn-p

var source = Rx.Observable.from([0, 1, 2, 3, 4, 5])
    .map(
        function(value) {
            try {
                if (value === 3) {
                    throw new Error("Value cannot be 3");
                }
                return value;

            } catch (error) {
                console.log('I caught an error');
                return undefined;
            }
        })
    .filter(function(x) {
        return x !== undefined; });


source.subscribe(
    function(value) {
        console.log("onNext " + value);
    },
    function(error) {
        console.log("Error: " + error.message);
    },
    function() {
        console.log("Completed!");
    });
&lt;script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/4.0.6/rx.all.js"&gt;&lt;/script&gt;

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-08-15
    • 2014-09-11
    相关资源
    最近更新 更多