【问题标题】:RxJava2's delay(rx.functions.Func1) is not emitting items in orderRxJava2 的延迟(rx.functions.Func1)没有按顺序发出项目
【发布时间】:2023-03-26 05:47:02
【问题描述】:

我正在使用delay的这个签名:

public final <U> Observable<T> delay(Func1<? super T,? extends Observable<U>> itemDelay)

javadoc

我正在使用Func1 返回一个Observable,它充当一种“触发器”。我的目标是延迟项目,直到外部异步操作完成。一旦该操作完成,我想发出所有已延迟的项目以及所有未来的项目按顺序

这是一些示例代码,显示了我正在尝试做的事情:

import java.util.concurrent.atomic.AtomicBoolean;

import io.reactivex.Observable;
import io.reactivex.ObservableTransformer;
import io.reactivex.functions.Consumer;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subjects.ReplaySubject;

public class Example {

    private ReplaySubject<Object> delayTrigger = ReplaySubject.create(); // (1)

    public void main() {
        System.out.println("============ MAIN ============");
        SourceThread sourceThread = new SourceThread();
        sourceThread.start();

        sourceThread.stream
                .compose(doOnFirst(integer -> startAsyncOperation())) // (2)
                .delay(integer -> delayTrigger) // (3)
                .subscribeOn(Schedulers.io())
                .observeOn(Schedulers.io())
                .subscribe((Integer integer)
                        -> System.out.println("onNext: " + integer));
    }

    private void startAsyncOperation() {
        System.out.println(">>>>>>> long async operation started");
        SomeOtherThread someOtherThread = new SomeOtherThread();
        someOtherThread.start();
    }

    private void onAsyncOperationComplete() {
        System.out.println("<<<<<<< long async operation completed");
        delayTrigger.onNext(new Object()); // (4)
    }

    /**
     * From https://stackoverflow.com/a/32366794
     */
    private <T> ObservableTransformer<T, T> doOnFirst(Consumer<? super T> consumer) {
        return observableTransformer -> Observable.defer(() -> {
            final AtomicBoolean first = new AtomicBoolean(true);
            return observableTransformer.doOnNext(t -> {
                if (first.compareAndSet(true, false)) {
                    consumer.accept(t);
                }
            });
        });
    }

    /**
     * Some thread to simulate a some time delayed source.
     * This is not really part of the problem,
     * we just need a time delayed source on another thread
     */
    private final class SourceThread extends Thread {
        private ReplaySubject<Integer> stream = ReplaySubject.create();

        @Override
        public void run() {
            super.run();
            for (int i = 0; i < 100; i++) {
                stream.onNext(i);
                System.out.println("Source emits item: " + i);
                try {
                    Thread.sleep(20);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    private final class SomeOtherThread extends Thread {
        @Override
        public void run() {
            super.run();
            try {
                Thread.sleep(1000);
                onAsyncOperationComplete();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

在 (1) 我创建了一个 ReplaySubject 作为我的触发器,在 (2) 我开始异步操作,在 (3) 我延迟直到触发器发出一些东西;最后在 (4) 处,当异步操作完成时,我将一些内容放入触发流中。

这在大多数情况下都可以正常工作,除非异步操作完成时,从delay 返回的流出现故障。

I/System.out: Source emits item: 46
I/System.out: Source emits item: 47
I/System.out: <<<<<<< long async operation completed
I/System.out: Source emits item: 48
I/System.out: onNext: 0
I/System.out: onNext: 48 <---- problem here!!!
I/System.out: onNext: 1
I/System.out: onNext: 2
I/System.out: onNext: 3

项目 48 在项目 1 - 47 之前从 delay 发出。项目 49 也将无序发出。这将一直持续到项目 1-47 被发出,然后流被清理。但是有很大一部分未订购的物品。有没有办法解决这个问题?我是否正确使用delay?这是延迟的错误吗?

仅供参考,这只是一个示例。在我的“真正”问题中,一旦发出的项目出现故障(即它们没有很好地编号),我就无法重新排序。

【问题讨论】:

    标签: delay rx-java2


    【解决方案1】:

    delay 运算符没有排序保证,因为项目#1 的内部源通常可能比项目#2 的另一个内部源发出信号晚。任何异步信号都可能会导致排序失败,即使来自诸如终止的ReplaySubject 之类的源也是如此。

    我假设您想预取主源,但不让它在外部信号之前通过,对吧?在这种情况下,您可以使用concatArrayEager,其中第一个源的完成触发预取的第二个源的发射:

    PublishSubject<Integer> delayer = PublishSubject.create();
    
    Observable.concatArrayEager(
        delayer,
        sourceThread.stream
    )
    
    // somewhere async
    delayer.onComplete();
    

    【讨论】:

    • 我认为内部排序没有保留,而是基于内部源发出发射信号的时间。感谢您提出的更改建议。
    猜你喜欢
    • 2019-12-02
    • 1970-01-01
    • 1970-01-01
    • 2021-11-13
    • 1970-01-01
    • 2014-09-15
    • 2016-02-15
    • 2016-03-24
    • 2015-07-24
    相关资源
    最近更新 更多