【问题标题】:Limit actions with rxJava and retryWhen operator使用 rxJava 和 retryWhen 运算符限制操作
【发布时间】:2016-07-10 18:51:23
【问题描述】:

我的应用程序通常必须做两件事:

  • 同时只接受一个网络请求
  • 请求失败时重试

我就是这样实现的:

public class RequestsLocker {

    private volatile boolean isLocked;

    public <T> Observable.Transformer<T, T> applyLocker() {
        if(!isLocked()) {
            return observable -> observable
                    .doOnSubscribe(() -> {
                        lockChannel();
                    })
                    .doOnUnsubscribe(() -> {
                        freeChannel();
                    });
        } else {
            return observable -> Observable.error(new ChannelBusyException("Channel is busy now."));
        }
    }

    private void lockChannel() {
        isLocked = true;
    }

    private void freeChannel() {
        isLocked = false;
    }

    public boolean isLocked() {
        return isLocked;
    }

}

看起来不错。

现在我的retryWhen 实现:

public static Observable<?> retryWhenAnyIoExceptionWithDelay(Observable<? extends Throwable> observable) {
    return observable.flatMap(error -> {
        // For IOExceptions, we  retry
        if (error instanceof IOException) {
            return Observable.timer(2, TimeUnit.SECONDS);
        }

        // For anything else, don't retry
        return Observable.error(error);
    });
}

这是我的使用方法:

public Observable<List<QueueCarItem>> finishService(int id, PaymentType paymentType, String notes) {
    return carsQueueApi.finishService(id, new FinishCarServiceRequest(paymentType.getName(), notes))
            .compose(requestsLocker.applyLocker(RequestsLocker.RequestChannel.CHANGE));
}

...

public void finishCarService(QueueCarItem carItem, PaymentType paymentType,
                             String notes, Subscriber<List<QueueCarItem>> subscriber) {
    queueApiMediator.finishService(carItem.getId(), paymentType, notes)
            .subscribeOn(ioScheduler)
            .observeOn(uiScheduler)
            .doOnError(this::handleError)
            .retryWhen(RxOperatorsHelpers::retryWhenAnyIoExceptionWithDelay)
            .subscribe(subscriber);
}

doOnUnsubscribe() 调用任何错误的主要问题,然后储物柜对任何新请求都是打开的,直到计时器到期并再次发生重新订阅。那就是问题所在。当计时器计时,用户可以发出另一个请求。

我该如何解决?

【问题讨论】:

  • 您能否发布一些代码,展示您如何实际使用applyLocker 转换器和retryWhenAnyIoExceptionWithDelay

标签: android rx-java reactive-programming rx-android reactivex


【解决方案1】:

问题是您将转换器应用到源 observable,即在您的 retrywhen 之前。 当出现错误时,您总是要取消订阅然后重新订阅源 observable 导致您的doOnUnsubscribe 被调用。

建议你试试

public Observable<List<QueueCarItem>> finishService(int id, PaymentType paymentType, String notes) {
    return carsQueueApi.finishService(id, new FinishCarServiceRequest(paymentType.getName(), notes));            
}


public void finishCarService(QueueCarItem carItem, PaymentType paymentType,
                             String notes, Subscriber<List<QueueCarItem>> subscriber) {
    queueApiMediator.finishService(carItem.getId(), paymentType, notes)
            .subscribeOn(ioScheduler)
            .observeOn(uiScheduler)
            .doOnError(this::handleError)
            .retryWhen(RxOperatorsHelpers::retryWhenAnyIoExceptionWithDelay)
            .compose(requestsLocker.applyLocker(RequestsLocker.RequestChannel.CHANGE));
            .subscribe(subscriber);
}

PS:apply locker 转换器看起来有点不同,即它在您链接的代码中没有参数。

【讨论】:

  • 是的,我明白了。这就是问题所在:我决定尝试 android10 的 CleanAcritecture。所以,RequetsLockerfinishService 方法在 data layer 上。当域上的finishCarService 方法时。现在我正在考虑将 retryWhen 运算符移至 data layer。但它打破了 CA 的理念,因为data layer 必须决定“做什么”,而不是“如何”。另一种方式——让RequestLockerdomain layer。在这种情况下,我必须将数据来源通知domain layer,因为RequestLocker 必须仅应用于网络请求,而不是getFromCache。这也是一种不好的做法。嗯..
【解决方案2】:

使用retryWhen,为了避免取消订阅onError,你必须使用onErrorResumeNext,它不会取消订阅。

看看这个例子

/**
 * Here we can see how onErrorResumeNext works and emit an item in case that an error occur in the pipeline and an exception is propagated
 */
@Test
public void observableOnErrorResumeNext() {
    Subscription subscription = Observable.just(null)
                                          .map(Object::toString)
                                          .doOnError(failure -> System.out.println("Error:" + failure.getCause()))
                                          .retryWhen(errors -> errors.doOnNext(o -> count++)
                                                                     .flatMap(t -> count > 3 ? Observable.error(t) : Observable.just(null)),
                                                     Schedulers.newThread())
                                          .onErrorResumeNext(t -> {
                                              System.out.println("Error after all retries:" + t.getCause());
                                              return Observable.just("I save the world for extinction!");
                                          })
                                          .subscribe(s -> System.out.println(s));
    new TestSubscriber((Observer) subscription).awaitTerminalEvent(500, TimeUnit.MILLISECONDS);
}

还有关于并发,如果在flatMap算子中做操作,可以指定Max concurrent。

public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func, int maxConcurrent) {
    if (getClass() == ScalarSynchronousObservable.class) {
        return ((ScalarSynchronousObservable<T>)this).scalarFlatMap(func);
    }
    return merge(map(func), maxConcurrent);
}

您可以在此处查看更多示例https://github.com/politrons/reactive

【讨论】:

  • 谢谢。我现在终于不能理解你的代码,但我会在几天内测试它。
【解决方案3】:

我当前的解决方案不是在 IoException 上解锁 RequestLocker,因为在这种情况下,请求将在延迟后重复。

public <T> Observable.Transformer<T, T> applyLocker() {
    if(!isLocked()) {
        return observable -> observable.doOnSubscribe(() -> {
            lockChannel();
        }).doOnNext(obj -> {
            freeChannel();
        }).doOnError(throwable -> {
            if(throwable instanceof IOException) {
                return; // as any request will be repeated in case of IOException
            }
            freeChannel(channel);
        });
    } else {
        return observable -> Observable.error(new ChannelBusyException("Channel is busy now"));
    }
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-04-26
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多