【问题标题】:RxJava: Execute second observables only if first one throws an error and repeat from the firstRxJava:仅当第一个抛出错误并从第一个重复时才执行第二个 observable
【发布时间】:2017-08-17 13:55:40
【问题描述】:

我正在使用retorift 访问getAricle api 并获取与用户相关的文章列表。如果传递的令牌过期,getArticle api 将抛出错误,如果是这样,我必须调用 refreshToken api 来获取新令牌,然后我必须再次调用 getArticle api

 ApiController.createRx().getArticle(token)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe({ response -> toast(response.body().url) }, { e ->
                println(e.printStackTrace())
                if(e is HttpException && e.code() in  arrayOf(401,403)){                      
                   //Here I want to call refresh tolken api
                   toast("Auth error")
                }
                else
                   toast(R.string.something_went_wrong)
            })

编辑

尽管给出的答案显示了一些方向,但这些并不是我问题的直接答案。这就是解决它的方法,但我觉得这可以重构为更好的代码

ApiController.createRx().getArticle(Preference.getToken())
            .flatMap { value ->
                if (value.code() in arrayOf(403, 401)) {
                    ApiController.refreshToken()
                    ApiController.createRx().getArticle(Preference.getToken())
                } else Observable.just(value)
            }
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe({ response -> println("Success") }, { e ->
                e.printStackTrace()
                toast(R.string.something_went_wrong)
            })



fun refreshToken() {
        val token:String?=ApiController.createRx().refreshToken(Preferences.getRefreshToken()).blockingFirst()?.body()?.token
        if (token != null) Preferences.setAuthToken(token)
    }

编辑

我将代码重构为更简洁的版本

Observable.defer { ApiController.createRx().getArticle(Preferences.getToken()) }
            .flatMap {
                if (it.code() in arrayOf(401, 403)) {
                    ApiController.refreshToken()
                    Observable.error(Throwable())
                } else Observable.just(it)
            }
            .retry(1)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe({println("Success") }, {
              it.printStackTrace()
              toast(R.string.something_went_wrong)
            })



 fun refreshToken() {
        var token: String? = null
        try {
            token = createRx().refreshToken(Preferences.getRefreshToken()).blockingFirst().body()!!.token
        } catch (e: Exception) {
            throw e
        }
        println("saving token")
        if (token != null) Preferences.setAuthToken(token)
    }

编辑

请查看我对最终重构代码的回答

【问题讨论】:

  • 首先,通过改造,您可以避免直接访问onError。您可以返回Single<Result<CustomObject>>,它永远不会返回错误。但是如果你想继续得到onError,请尝试使用RxJava的错误处理运算符here
  • @masp 在这两种情况下,我都无法理解如何调用 getArticle api。你能告诉我怎么做吗
  • 你能解决你的问题吗?提供的任何答案是否有帮助?如果是这样,请考虑投票任何有用的答案,如果其中一个让您找到解决方案,请接受该答案。
  • @theFunkyEngineer 嘿,我花了一些时间解决了这个问题,我把它写成答案请评论你的建议

标签: android rx-java retrofit2 rx-java2 rx-kotlin


【解决方案1】:

我已经实现了这个确切的东西。这是该代码的略微修改版本:

private Observable<Object> refreshTokenIfNotAuthorized(Observable<? extends Throwable> errors) {
    final AtomicBoolean alreadyRetried = new AtomicBoolean(false);

    return errors.flatMap(error -> {

        boolean isAuthorizationError = /* some logic analyzing each error*/ ;

        if (isAuthorizationError && !alreadyRetried.get()) {
            try {
                alreadyRetried.set(true);
                String newToken = federatedTokenRefresher.refreshToken()
                                                         .toBlocking()
                                                         .first();

                setLogin(newToken);
                return Observable.just(null);

            } catch (Exception e) {
                return Observable.error(error);
            }

        }
        return Observable.error(error);
    });
}

你可以像这样使用这个方法:

doSomethingRequiringAuth().retryWhen(this::refreshTokenIfNotAuthorized);

【讨论】:

  • @Praveen 我同意您的解决方案有效,但正如您所说,并不完全优雅。我的提案中有什么不符合您的需要吗?如果有,是什么?
  • 起初我以为改造会将所有 4xx 错误都扔给onError,后来我意识到这只是直接发送给onError 的网络错误,所以retryWhen 将不起作用。我只能使用flatMap。在您的代码中,您如何取消订阅此federatedTokenRefresher.refreshToken() .toBlocking() .first()
  • 拨打toBlocking时无需订阅/取消订阅。
  • 如果我在阻塞调用过程中关闭应用会怎样?
  • 如果“关闭”是指置于后台,它会继续运行。当然,作为生命周期处理的一部分,您应该取消订阅 UI。至于您关于处理网络错误的问题 - 我建议您在 .retryWhen(this::refreshTokenIfNotAuthorized) 之前的链中添加另一个 retryWhen 运算符并在那里处理网络错误。此外,一般来说,如果您尝试执行的操作由多个 API 调用组成,那么在出现网络错误时让用户重试可能会更容易。
【解决方案2】:

您将收到什么样的错误?看来您可以使用onErrorResumeNext 运算符。

这个操作符一旦接收到一个 throwable,允许你返回一个 Observable 而不是 onError 中的 throwable

@Test
    public void observableOnErrorResumeException() {
        Integer[] numbers = {0, 1, 2, 3, 4, 5};

        Observable.from(numbers)
                .doOnNext(number -> {
                    if (number > 3) {
                        try {
                            throw new IllegalArgumentException();
                        } catch (Exception e) {
                            throw new RuntimeException(e);
                        }
                    }

                })
                .onErrorResumeNext(t -> Observable.just(666))
                .subscribe(System.out::println);

    }

您可以在这里查看更多示例https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/errors/ObservableExceptions.java

【讨论】:

  • 那么在处理错误并刷新令牌后如何重复
  • 您可以使用 retryWhen 运算符,看看我在链接中粘贴的其余示例
【解决方案3】:

我会给你另一个使用 groupBy 运算符的选项

/**
 * In this example we create a response code group.
 */
@Test
public void testGroupByCode() {
    Observable.from(Arrays.asList(401,403, 200))
            .groupBy(code -> code)
            .subscribe(groupByCode -> {
                switch (groupByCode.getKey()) {
                    case 401: {
                        System.out.println("refresh token");
                        processResponse(groupByCode);
                        break;
                    }
                    case 403: {
                        System.out.println("refresh token");
                        processResponse(groupByCode);
                        break;
                    }
                    default: {
                        System.out.println("Do the toast");
                        processResponse(groupByCode);
                    }
                }
            });
}

private void processResponse(GroupedObservable<Integer, Integer> groupByCode) {
    groupByCode.asObservable().subscribe(value -> System.out.println("Response code:" + value));
}

【讨论】:

  • 我无法理解这一点。这太笼统了。如何将此应用于我的场景,并且 Rx java2 中也没有asObservable。刷新令牌后您在哪里尝试原始可观察对象
  • 查看文档中的 groupby 运算符
  • 我知道groupBy 运算符,但如何将其应用于我的场景
  • 正如我在回答中所做的那样,您的组包含响应的值以及可观察的值。如果是 401-403,则使用刷新创建一个新的 observable,默认只订阅
  • 顺便说一句,不说比你原来的解决方案好。只是不同;)
【解决方案4】:

在阅读了有关RxJava 的更多信息后,我解决了我的问题,这就是我实现它的方式。 首先将retrofitonErroronNext\onSuccess 抛出4xx 错误取决于我们如何定义它。 例如:

@GET("content") fun getArticle(@Header("Authorization") token: String):Single<Article>

这会将所有 4xx 错误抛出到 onError 而不是 Single&lt;Article&gt; 如果您将其定义为 Single&lt;Response&lt;Article&gt;&gt; 那么来自服务器的所有响应(包括 4xx)都将转到 onNext\onSuccess

Single.defer { ApiController.createRx().getArticle(Preferences.getAuthToken())}
                .doOnError {
                    if (it is HttpException && it.code() == 401)
                        ApiController.refreshToken()
                }
                .retry { attempts, error -> attempts < 3 && error is HttpException && error.code() == 401 }
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe({println("Success") }, {
                  it.printStackTrace()
                  toast(R.string.something_went_wrong)
                })

我使用defer 作为我实际Observable 的包装器,因为我想在令牌刷新后重新创建可在重试时观察到的文章获取,因为我希望在我的刷新令牌代码存储新获取时再次调用Preferences.getAuthToken()令牌优先。

如果HttpException 为 401 且未尝试重试超过 2 次,则retry 返回 true

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-06-29
    • 1970-01-01
    相关资源
    最近更新 更多