【问题标题】:RxJava - Wait till retryWhen finish for other observables in other Activities/FragmentsRxJava - 等到 retryWhen 完成其他活动/片段中的其他可观察对象
【发布时间】:2017-06-22 17:18:40
【问题描述】:

用例:我正在开发一个 Android 应用程序,它有一个带有 4 个标签的 viewpager,它们都是 Fragments。对于每个选项卡/片段,我必须每 5 分钟使用 Oauth 和令牌过期连接到 REST Api。

当前解决方案: 使用 RxJava 和 retryWhen 运算符我可以在收到 401 HTTP 错误时重新进行身份验证。对于每个订阅和消费的 Observable 流并使用:

retryWhen(refreshTokenAuthenticator)

因此,当令牌过期时,流会使用它,然后执行真正的 api 调用。

问题:这仅适用于一个订阅中消耗的一个 observable,但考虑到 401 错误可能随时出现,我需要允许用户在标签之间切换而不阻止他/她在任何 Api 调用的任何片段中。

问题:有没有办法让 observables 等待其他 observables 用 onNext() 结束,它们不在同一个流/订阅者中?实际上在不同的片段中?所以api调用场景会是这样的:

Api Call Fragment A --> request
Api Call Fragment A <-- response 200 Code

Api Call Fragment B --> request
Api Call Fragment B <-- response 401 Code (retryWhen in action)
Api Call Fragment B --> request (refreshToken)
Api Call Fragment B <-- response 200 (with new access token saved in the app)

几乎同时...

Api Call Fragment C --> request
Api Call Fragment C <-- response 401 Code (retryWhen in action)

Observable in Fragment C Waits till Observable in Fragment B finish (onNext())

Api Call Fragment C --> request
Api Call Fragment C <-- response 200

这是我已经拥有的,每个 API 调用看起来都差不多:

public void getDashboardDetail() {

    Subscription subscription = repository.getDashboard()
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .retryWhen(tokenAuthenticator)
            .subscribe(new RestHttpObserver<UserDataDto>() {
                @Override
                public void onUnknownError(Throwable e) {
                    getMvpView().onError(e);
                }

                @Override
                public void onHostUnreachable() {
                    getMvpView().onHostUnreachable();
                }

                @Override
                public void onHttpErrorCode(int errorCode, ErrorDto errorDto) {
                    getMvpView().onHttpErrorCode(errorCode, errorDto);
                }

                @Override
                public void onCompleted() {
                    //Do nothing...
                }

                @Override
                public void onNext(UserDataDto response) {
                    getMvpView().onReceiveUserData(response);
                }
            });

    this.compositeSubscription.add(subscription);

}

还有我的 RefreshTokenAuthenticator:

public class RefreshTokenAuthenticator implements Func1<Observable<? extends Throwable>, Observable<?>> {

private static final int RETRY_COUNT = 1;

private static final int HTTP_ERROR_CODE = 401;

@Inject
private UserRepository repository;

@Inject
private SessionManager sessionManager;

@Inject
private MyApplication application;


@Inject
private RefreshTokenAuthenticator() {
}

@Override
public synchronized Observable<?> call(Observable<? extends Throwable> observable) {
    return observable
            .flatMap(new Func1<Throwable, Observable<?>>() {
                int retryCount = 0;

                @Override
                public Observable<?> call(final Throwable throwable) {

                    retryCount++;
                    if (retryCount <= RETRY_COUNT && throwable instanceof HttpException) {
                        int errorCode = ((HttpException) throwable).code();
                        if (errorCode == HTTP_ERROR_CODE) {
                            return repository
                                    .refreshToken(sessionManager.getAuthToken().getRefreshToken())
                                    .observeOn(AndroidSchedulers.mainThread())
                                    .subscribeOn(Schedulers.io())

                                    .doOnNext(tokenDto -> sessionManager.saveAuthToken(tokenDto))
                                    .doOnError(throwable1 -> {
                                        Log.e("RefreshTokenAuth", "DoOnError", throwable1);
                                        application.logout();
                                    });

                        }
                    }
                    // No more retries. Pass the original Retrofit error through.
                    return Observable.error(throwable);
                }
            });
}

}

【问题讨论】:

    标签: android android-fragments rx-java reactive-programming


    【解决方案1】:

    1) 使身份验证令牌的源缓存上一个成功的结果 + 提供方法使这个缓存的结果无效:

    class Auth {
        private Observable<AuthToken> validToken;
    
        synchronized void invalidateAuthToken() {
            validToken = null;
        }
    
        synchronized Observable<AuthToken> getAuthToken() {
            if (validToken == null) {
                validToken = repository
                    .refreshToken(...) // start async request
                    .doOnError(e -> invalidateAuthToken())
                    .replay(1); // cache result
            }
            return validToken; // share among all subscribers
        }
    }
    

    2) 要访问 Web 服务,请使用以下模式:

    Observable<Data1> dataSource1 = 
        Observable.defer(auth.getAuthToken()) // always start from token
            .flatMap(token ->
                repository.fetchData1(token, ...)) // use token to call web service
            .doOnError(e -> auth.invalidateAuthToken())
            .retry(N); // retry N times
    

    【讨论】:

    • 我正在使用和 Retrofit 2 的拦截器来拦截请求并将标头附加到令牌中。您认为您的答案(以 Rx 方式使用 authtoken)更好还是至少更容易处理?
    • @NicolasJafelle 我的例子是通用的。您可以实现fetchData1 函数,以便它以改造方式注入令牌。
    【解决方案2】:

    如果应用当前是否正在重新进行身份验证,最后只需添加一个全局(在我的 Application 类中)布尔值即可。它实际上允许两个 401 HTTP 错误,但第二个在 onNext() 中继续并重新执行初始 observable。我想做一些更具反应性的事情,但至少这解决了我的主要问题。

    public class RefreshTokenAuthenticator implements Func1<Observable<? extends Throwable>, Observable<?>> {
    
    private static final int RETRY_COUNT = 1;
    
    private static final int HTTP_ERROR_CODE = 401;
    
    @Inject
    private UserRepository repository;
    
    @Inject
    private SessionManager sessionManager;
    
    @Inject
    private MyApplication application;
    
    
    @Inject
    private RefreshTokenAuthenticator() {
    }
    
    @Override
    public Observable<?> call(Observable<? extends Throwable> observable) {
        return observable
                .flatMap(new Func1<Throwable, Observable<?>>() {
                    int retryCount = 0;
    
                    @Override
                    public Observable<?> call(final Throwable throwable) {
    
                        retryCount++;
                        if (retryCount <= RETRY_COUNT && throwable instanceof HttpException) {
                            int errorCode = ((HttpException) throwable).code();
    
                            if (errorCode == HTTP_ERROR_CODE) {
    
                                Log.i("RefreshTokenAuth", "APPLICATION IS AUTHENTICATING = " + application.isAuthenticating);
                                if (!application.isAuthenticating) {
                                    application.isAuthenticating = true;
    
                                    String refreshToken = sessionManager.getAuthToken().getRefreshToken();
    
                                    return repository
                                            .refreshToken(refreshToken)
                                            .observeOn(AndroidSchedulers.mainThread())
                                            .subscribeOn(Schedulers.io())
                                            .doOnCompleted(() -> application.isAuthenticating = false)
                                            .doOnNext(tokenDto -> sessionManager.saveAuthToken(tokenDto))
                                            .doOnError(throwable1 -> {
                                                Log.e("RefreshTokenAuth", "DoOnError", throwable1);
                                                application.logout();
                                            });
                                } else {
                                    return Observable.just(1).doOnNext(o -> Log.i("RefreshTokenAuth", "Let's try another shot!"));
                                }
                            }
                        }
                        // No more retries. Pass the original Retrofit error through.
                        return Observable.error(throwable);
                    }
                });
    }
    

    }

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2018-06-29
      • 2021-12-14
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多