【问题标题】:RxJava Subject emits on incorrect schedulerRxJava 主题在不正确的调度程序上发出
【发布时间】:2024-01-11 00:40:01
【问题描述】:

我有一个单身人士的以下课程:

public class SessionStore {
    Subject<Session, Session> subject;

    public SessionStore() {
       subject = new SerializedSubject<>(BehaviorSubject.create(new Session());
    }

    public void set(Session session) {
        subject.onNext(session);
    }

    public Observable<UserSession> observe() {
        return subject.distinctUntilChanged();
    }
}

在活动中,我观察会话并对每次更改执行网络操作:

private Subscription init() {
    return sessionStore
            .observe()
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .flatMap(new Func1<Session, Observable<Object>>() {
                @Override
                public Observable<Object> call(Session session) {
                    return retrofitService.getAThing();
                }
            })
            .subscribe(...);
}

当我订阅会话存储时,主题立即在io() 上发出,因为它是BehaviourSubject,而订阅者在mainThread() 上执行。

当我已经订阅了sessionStore.set(new AnotherSession()) 时,问题就出现了。 IMO 这应该在io() 调度程序上执行init() 中定义的流。然而,发生的事情是流在调用subject.onNext() 的同一线程上执行。当我在flatMap() 中进行网络操作时,导致进入NetworkOnMainThreadException

我是否理解错了主题?我会这样滥用它们吗?请问什么是正确的解决方案?

我还尝试在observe() 方法中用Observable.fromEmitter() 替换整个主题方法,但令人惊讶的是输出完全相同。

【问题讨论】:

    标签: android multithreading rx-java scheduler subject


    【解决方案1】:

    请看一下《Reactive Programming with RxJava》一书中的以下部分

    默认情况下,在 Subject 上调用 onNext() 会直接传播到所有 Observer 的 onNext() 回调方法。这些方法使用相同的名称也就不足为奇了。在某种程度上,在 Subject 上调用 onNext() 会间接在每个订阅者上调用 onNext()。

    让我们回顾一下: 如果您从 Thread-1 对 Subject 调用 onNext,它将从 Thread-1 调用 onNext 到订阅者。 onSubscribe 将被丢弃。

    所以首先要做的事情是: 订阅将发生在哪个线程上:

    retrofitService.getAThing()
    

    我只是猜测,并说它是调用线程。这将是observeOn中描述的线程,即Android-UI-Loop。

    observeOn 下的每个值都将按照调度程序的指定从 Thread-a 转移到 Thread-b。 UI-Loop 上的 observeOn 应该在订阅之前发生。订阅中接收到的每个值都在 UI-Loop 上,不会阻塞 UI 线程或以异常结束。

    请看一下示例代码和输出:

    class SessionStore {
        private Subject<String, String> subject;
    
        public SessionStore() {
            subject = BehaviorSubject.create("wurst").toSerialized();
        }
    
        public void set(String session) {
            subject.onNext(session);
        }
    
        public Observable<String> observe() {
            return subject
                    .asObservable()
                    .doOnNext(s -> System.out.println("Receiving value on Thread:: " + Thread.currentThread()))
                    .distinctUntilChanged();
        }
    }
    
    @Test
    public void name() throws Exception {
        // init
        SessionStore sessionStore = new SessionStore();
    
        TestSubscriber testSubscriber = new TestSubscriber();
        Subscription subscribe = sessionStore
                .observe()
                .flatMap(s -> {
                    return Observable.fromCallable(() -> {
                        System.out.println("flatMap Thread:: " + Thread.currentThread());
                        return s;
                    }).subscribeOn(Schedulers.io());
                })
                .doOnNext(s -> System.out.println("After flatMap Thread:: " + Thread.currentThread()))
                .observeOn(Schedulers.newThread()) // imagine AndroidScheduler here
                .subscribe(testSubscriber); // Do UI-Stuff in subscribe
    
        new Thread(() -> {
            System.out.println("set on Thread:: " + Thread.currentThread());
            sessionStore.set("123");
        }).start();
    
        new Thread(() -> {
            System.out.println("set on Thread:: " + Thread.currentThread());
            sessionStore.set("345");
        }).start();
    
        boolean b = testSubscriber.awaitValueCount(3, 3_000, TimeUnit.MILLISECONDS);
    
        Assert.assertTrue(b);
    }
    

    输出::

    Receiving value on Thread:: Thread[main,5,main]
    flatMap Thread:: Thread[RxIoScheduler-2,5,main]
    After flatMap Thread:: Thread[RxIoScheduler-2,5,main]
    set on Thread:: Thread[Thread-1,5,main]
    set on Thread:: Thread[Thread-0,5,main]
    Receiving value on Thread:: Thread[Thread-1,5,main]
    flatMap Thread:: Thread[RxIoScheduler-2,5,main]
    After flatMap Thread:: Thread[RxIoScheduler-2,5,main]
    Receiving value on Thread:: Thread[Thread-1,5,main]
    flatMap Thread:: Thread[RxIoScheduler-2,5,main]
    After flatMap Thread:: Thread[RxIoScheduler-2,5,main]
    

    【讨论】:

    • 谢谢,我还没有考虑为在 flatMap 中创建的 Observable 定义调度程序。我认为如果我在平面地图之后定义调度程序,它将应用于整个流。
    • 这取决于你想要达到的目标。您可以在 flatMap 之前使用 observeOn。链中的每个值都将从线程 X 的 observeOn 点移动到另一个线程。但是,通过将 subscribeOn 指定到 flatMap 中的 observable 上,您实现了并发。所以 onSubscribe / onObserve 是有区别的。 OnObserve 可以在一个 observable 中多次放置,以便将排放物转移到另一个线程。对于 subscribeOn 应该只有一个可观察对象,因为将使用最上层的 subscribeOn。
    • 考虑到这一点,我不明白为什么当我从flatMap() 内部删除subscribeOn() 并将其放在前一步时,流的行为会有所不同。喜欢sessionStore.observe().subscribeOn().flatMap()...
    • 在 flatMap 中,您正在创建一个新的 observable,它将使用 subscribeOn。 FlatMap 为每个项目订阅 observable(使用 subscribeOn)并合并所有发出的值。让我们考虑使用 onNext 发出的 3 个值。每个值都将在 flatMap 中使用,这将导致一个新的 observable 将被订阅。您将拥有三个将合并在一起的流,它们都从不同的线程发出。如果你在 flatMap 之前使用 subscribeOn,它只会说订阅给定的 observable 将发生在给定的调度程序上。与 flatMap 不同,不会发生并发
    • 嘿,你能不能也回答一下这个问题? *.com/questions/40762911/…
    【解决方案2】:

    当您调用操作员时,它会影响整个下游。如果你打电话:

    .observeOn(AndroidSchedulers.mainThread())
    

    在不正确的地方,流的其余部分在指定线程上执行。

    我建议你总是添加:

    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    

    在流的最后:

    private Subscription init() {
        return sessionStore
                .observe()
                .flatMap(new Func1<Session, Observable<Object>>() {
                    @Override
                    public Observable<Object> call(Session session) {
                        return retrofitService.getAThing();
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(...);
    }
    

    【讨论】:

    • 谢谢。尽管我确实在流的末尾指定了调度程序,但问题一直存在。
    • retrofitService.getAThing() 没有指定任何调度器?
    • 不。我还简化了示例以提高可读性。
    【解决方案3】:

    我认为您忘记了您的 Subject 也是观察者,因此为了让 onNext 在 io 线程上运行尝试

    public class SessionStore {
        Subject<Session, Session> subject;
    
        public UserSessionStore() {
           subject = new SerializedSubject<>(BehaviorSubject.create(new Session())).observeOn(Schedulers.io());
        }
    
        public void set(Session session) {
            subject.onNext(session);
        }
    
        public Observable<UserSession> observe() {
            return subject.distinctUntilChanged();
        }
    }
    

    【讨论】:

    • 我也试过那个。但是通过在distinctUntilChanged() 之后添加io()。另外,我相信您的意思是 subscribeOn() 而不是 observeOn(),并且 SerializedSubject 的构造函数采用 Subject 而不是 Observable,因此您不能按照您所写的方式进行操作。
    • 是的,这是我的错字。 observeOn 应该在 SerializedSubject 上,但它仍然应该是 observeOn