【问题标题】:RxJS retry not behaving as expected?RxJS 重试未按预期运行?
【发布时间】:2018-12-18 15:02:30
【问题描述】:

我在让 RxJS 的重试运算符与 map 运算符结合使用时遇到了一些困难。

基本上我要做的是获取一个值流(在本例中为 GitHub 用户用于测试目的),并为每个值对该值执行一些操作。

但是,我希望能够以允许我为该操作设置有限重试次数的方式处理错误。我还假设 RxJS 异步处理这样的事情。

Rx.Observable.fromPromise(jQuery.getJSON("https://api.github.com/users"))
.concatAll(x => Rx.Observable.of(x))
.map(x => {
    if(x.id % 5 == 0) {
        console.log("error");
        return Rx.Observable.throw("error");
    }
    return x;
})
.retry(3)
.subscribe(x => console.log(x), e => console.log(e));

这是我到目前为止所得到的,但是它应该重试相同的值 3 次,它只会打印一次“错误”,这不是我正在寻找的行为。

我是否遗漏了一些琐碎的事情,或者 RxJS 根本不可能做到这一点?简而言之,我想对值异步执行操作,将所述操作重新绑定有限次,但允许其他有效操作并行继续。我只使用 promises 实现了类似的东西,但代码相当混乱,我希望 RxJS 能让我让事情变得更好读。

【问题讨论】:

    标签: javascript rxjs reactive-programming


    【解决方案1】:

    编辑 - 为清晰起见删除早期失败的解决方案(某些 cmets 可能不再相关)

    要重试单个失败,您需要中断单个请求并重试这些请求。

    为了允许正确处理错误,请将流拆分为每个预期结果的子流。

    我还添加了一些代码以允许一次重试成功。

    const Observable = Rx.Observable
    
    // Define tests here, allow requestAgain() to pass one
    const testUser = (user) => user.id % 5 !== 0
    const testUserAllowOneSuccess = (user) => user.id === 5 || user.id % 5 !== 0
    
    const requestAgain = (login) => Observable.of(login)
      .switchMap(login => jQuery.getJSON("https://api.github.com/users/" + login ))
      .map(x => {
        if(!testUserAllowOneSuccess(x)) {
          console.log("logging error retry attempt", x.id);
          throw new Error('Invalid user: ' + x.id)
        }
        return x;
      })
      .retry(3)
      .catch(error => Observable.of(error))
    
    const userStream = Observable.fromPromise(jQuery.getJSON("https://api.github.com/users"))
      .concatAll()
    
    const passed = userStream.filter(x => testUser(x))
    const failed = userStream.filter(x => !testUser(x))
      .flatMap(x => requestAgain(x.login))
    
    const retryPassed = failed.filter(x => !(x instanceof Error))
    const retryFailed = failed.filter(x => (x instanceof Error))
      .toArray()
      .map(errors => { throw errors })
    
    const output = passed.concat(retryPassed, retryFailed)
    
    output.subscribe(
      x=> console.log('subscribe next', x.id ? x.id : x), 
      e => console.log('subscribe error', e)
    );
    

    工作示例CodePen

    【讨论】:

    • 完美,这正是我正在寻找的解决方案。非常感谢!
    • 我已经用这个解决方案做了一些进一步的测试,似乎遇到了一些奇怪的问题。在 chrome 中刷新缓存时,它不会尝试获取所有用户,仅在刷新页面后大约 5 或 6 个用户,然后它会决定获取所有大约 40 个用户(或者对于 api 来说页面大小有多大)。我不确定的另一件事是,无论有多少实际失败,“无效用户”错误消息只会显示一次。
    • 确实如此。我想知道是否是事件的异步而不是缓存导致了它。我的理论是 id #5 的错误抛出(在 n 次重试之后)停止了主序列的死机。通常,主列表完全得到输出,因为它在内存中并且可以击败 3 个 HTTP requestAgain 调用,但有时不能。所以看来我们需要一个替代throw
    • 另外,当截断发生时,我看到 #3 丢失并且 #4 在序列中被延迟。我希望成功的项目会按顺序出现。
    • 再想,抛出错误肯定是错误的处理方式。可观察流的基本定义是它发出直到错误或完成,并且重试是主序列的一部分,由于映射内部的return x,这是在重试尝试成功的情况下所必需的。这可能也意味着.retry()不能用于重试机制。
    【解决方案2】:

    我发现有几个问题可能会导致您遇到问题:

    1) 在你的 map 语句中,你通过返回一个 observable 来进行抛出。如果成功案例还返回了一个您正在展平的可观察对象(使用 concat、switch 或合并),那么这将是合适的,但是您的成功案例只是发出一个值而从不展平。因此,当返回 Throw observable 时,它​​永远不会被展平,因此只会通过并记录 observable 对象。

    相反,你可以只做一个普通的 javascript throw 'error'(尽管考虑抛出一个错误对象而不是一个字符串)

    2) 当重试出错时,它会重新订阅。如果您使用 observables 进行获取,这将意味着它将再次进行获取。但是,您使用 Promise 进行了获取,然后仅从该 Promise 中创建了一个 observable。因此,当您重新订阅 observable 时,它​​将最终使用与第一次相同的 Promise 对象,并且该 Promise 已经处于已解决状态。不会进行额外的提取。

    要解决此问题,您需要有一个可观察的版本来进行提取。如果你需要从头开始,它可能看起来像这样:

    Rx.Observable.create(observer => {
      jQuery.getJSON("https://api.github.com/users")
        .then(result => {
          observer.next(result);
          observer.complete();
        }, err => {
          observer.error(err);
          observer.complete();
        });
    })
    

    如果您打算经常这样做,我建议您创建一个为您进行包装的函数。

    编辑:

    我的最终目标是基本上能够获取一组 api url,并使用每个 URL 启动 get 请求。如果 get 请求由于某种原因失败,它应该总共尝试 3 次才能再次到达该端点。但是,此类获取请求的失败不应阻止其他请求的发生,它们应该并行发生。此外,如果一个 get 请求失败 3 次,它应该被标记为失败并且它不应该停止整个过程。到目前为止,我还没有找到使用 RxJS 的方法。

    然后我会做这样的事情:

    function getJson (url) {
      return Rx.Observable.create(observer) => {
        jQuery.getJSON(url)
          .then(result => {
            observer.next(result);
            observer.complete();
          }, err => {
            observer.error(err);
            observer.complete();
          });
      }
    }
    
    const endpoints = ['someUrl', 'someOtherUrl'];
    const observables = endpoints.map(endpoint => 
      return getJson(url)
        .retry(3)
        .catch(err => Rx.Observable.of('error'));
    });
    
    Rx.Observable.forkJoin(...observables)
      .subscribe(resultArray => {
        // do whatever you need to with the results. If any of them
        //   errored, they will be represented by just the string 'error'
      });
    

    【讨论】:

    • 我的最终目标是基本上能够获取一组 api url,并使用每个 URL 启动 get 请求。如果 get 请求由于某种原因失败,它应该总共尝试 3 次才能再次到达该端点。但是,此类获取请求的失败不应阻止其他请求的发生,它们应该并行发生。此外,如果一个 get 请求失败 3 次,它应该被标记为失败并且它不应该停止整个过程。到目前为止,我还没有找到使用 RxJS 的方法。
    • 我根据您的最新描述编辑了一份提案
    • 我不得不稍微修改你提供的代码,让它表现出我想要的样子,但效果很好:)
    【解决方案3】:

    您的地图函数返回Rx.Observable.throw("error");。这是错误的,因为下面的retry() 将收到它的包装(即Observable<Observable<T>> 而不是Observable<T>。您可以使用flatMap 来纠正它,但是您还需要通过Rx.Observable.of(x) 包装该值.

    .flatMap(x => {
        if(x % 5 == 0) {
            console.log("error");
            return Rx.Observable.throw("error");
        }
        return Rx.Observable.of(x);
    })
    .retry(3)
    .subscribe(x => console.log(x), e => console.log(e));
    

    另一种选择是throw new Error(),而不是尝试return Rx.Observable.throw("error");


    确保你在正确的事情上重试

    Rx.Observable.fromPromise(jQuery.getJSON("https://api.github.com/users"))
      .retry(3)
      .concatAll(x => Rx.Observable.of(x))
      ...
    

    【讨论】:

    • 我明白了,但出现了另一个问题。当重试启动时,它会再次启动整个 observable,这意味着检索所有用户并且永远不会超过第 5 个用户。我想要做的只是对单个用户采取行动,因此在重试时只执行 flatMap 函数中的代码,而不是整个可观察的代码。例如,对 ID 为 5 的用户的某些操作失败,其他用户应继续操作,而失败的用户应继续重试,直到重试 3 次。同样,我不知道 RxJS 是否可以做到这一点,但它是用 promises 实现的。
    • @samcp20 这是因为您正在重试一大堆 observables。相反,请重试您期望失败的内容。查看更新后的代码,其中retry 紧跟在fromPromise 之后(在concatAll 有机会产生许多新的可观察对象之前)。
    • 移动重试运算符仍然无法实现我想要做的事情。我基本上需要每个操作(本示例中的 if(x.id % 5 == 0) 代码)完全是一些可以重试但独立于其他操作的单独任务。如果一个操作失败,我不希望整个 observable 停止处理,它应该简单地为该用户输出一个错误。简而言之,每个生成的 Observable 都应该自行重试,同时允许其他 Observablea 继续处理,并且 3 次失败不应停止整个数据流。
    【解决方案4】:

    然后用from 转换promise 是不可重复的

    示例:https://stackblitz.com/edit/rxjs-dp3md8

    但是,Observable.create 效果很好

    如果有人有解决方案可以重复承诺,请发表评论。

    【讨论】:

      猜你喜欢
      • 2020-08-08
      • 2021-11-09
      • 1970-01-01
      • 2021-03-23
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-10-23
      相关资源
      最近更新 更多