【发布时间】:2017-03-07 15:28:20
【问题描述】:
我正在运行 RxJava 并创建一个主题以使用 onNext() 方法来生成数据。我正在使用 Spring。
这是我的设置:
@Component
public class SubjectObserver {
private SerializedSubject<SomeObj, SomeObj> safeSource;
public SubjectObserver() {
safeSource = PublishSubject.<SomeObj>create().toSerialized();
**safeSource.subscribeOn(<my taskthreadExecutor>);**
**safeSource.observeOn(<my taskthreadExecutor>);**
safeSource.subscribe(new Subscriber<AsyncRemoteRequest>() {
@Override
public void onNext(AsyncRemoteRequest asyncRemoteRequest) {
LOGGER.debug("{} invoked.", Thread.currentThread().getName());
doSomething();
}
}
}
public void publish(SomeObj myObj) {
safeSource.onNext(myObj);
}
}
在 RxJava 流上生成新数据的方式是 @Autowire private SubjectObserver subjectObserver
然后调用subjectObserver.publish(newDataObjGenerated)
无论我为subscribeOn() 和observeOn() 指定什么:
- Schedulers.io()
- Schedulers.computation()
- 我的话题
- Schedulers.newThread
onNext() 及其内部的实际工作是在同一线程上完成的,该线程实际上调用主题上的onNext() 以生成/生成数据。
这是正确的吗?如果是这样,我错过了什么?我期待 doSomething() 在不同的线程上完成。
更新
在我的调用类中,如果我更改调用publish 方法的方式,那么当然会分配一个新线程供订阅者在其上运行。
taskExecutor.execute(() -> subjectObserver.publish(newlyGeneratedObj));
谢谢,
【问题讨论】:
-
您所体验的内容是设计的。如果您从 threadX 调用 onNext,则链中的下一个运算符将由同一线程调用。 subscribeOn 对您没有帮助,因为您无法从调用 onNext 的位置进行操作,您只能使用 toSerialized 来维护合约。我建议您使用 onNext-operator 执行“事件”实例,然后使用 observeOn 并在更改线程后使用 flatMap 调用 doSomething()。在 subscribe-callback 中,您可以设置您的 UI 或您想要对结果执行的任何操作。
标签: spring rx-java subject-observer