【发布时间】:2024-01-11 00:40:01
【问题描述】:
我有一个单身人士的以下课程:
public class SessionStore {
Subject<Session, Session> subject;
public SessionStore() {
subject = new SerializedSubject<>(BehaviorSubject.create(new Session());
}
public void set(Session session) {
subject.onNext(session);
}
public Observable<UserSession> observe() {
return subject.distinctUntilChanged();
}
}
在活动中,我观察会话并对每次更改执行网络操作:
private Subscription init() {
return sessionStore
.observe()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.flatMap(new Func1<Session, Observable<Object>>() {
@Override
public Observable<Object> call(Session session) {
return retrofitService.getAThing();
}
})
.subscribe(...);
}
当我订阅会话存储时,主题立即在io() 上发出,因为它是BehaviourSubject,而订阅者在mainThread() 上执行。
当我已经订阅了sessionStore.set(new AnotherSession()) 时,问题就出现了。 IMO 这应该在io() 调度程序上执行init() 中定义的流。然而,发生的事情是流在调用subject.onNext() 的同一线程上执行。当我在flatMap() 中进行网络操作时,导致进入NetworkOnMainThreadException。
我是否理解错了主题?我会这样滥用它们吗?请问什么是正确的解决方案?
我还尝试在observe() 方法中用Observable.fromEmitter() 替换整个主题方法,但令人惊讶的是输出完全相同。
【问题讨论】:
标签: android multithreading rx-java scheduler subject