【问题标题】:run PublishSubject on different thread rxJava在不同的线程 rxJava 上运行 PublishSubject
【发布时间】:2017-03-07 15:28:20
【问题描述】:

我正在运行 RxJava 并创建一个主题以使用 onNext() 方法来生成数据。我正在使用 Spring

这是我的设置:

@Component
public class SubjectObserver {
    private SerializedSubject<SomeObj, SomeObj> safeSource;
    public SubjectObserver() {
       safeSource = PublishSubject.<SomeObj>create().toSerialized();
       **safeSource.subscribeOn(<my taskthreadExecutor>);**
       **safeSource.observeOn(<my taskthreadExecutor>);** 
       safeSource.subscribe(new Subscriber<AsyncRemoteRequest>() {
          @Override
          public void onNext(AsyncRemoteRequest asyncRemoteRequest) {
            LOGGER.debug("{} invoked.", Thread.currentThread().getName());
            doSomething();
          }
      }
    }
    public void publish(SomeObj myObj) {
        safeSource.onNext(myObj);
    }
}

在 RxJava 流上生成新数据的方式是 @Autowire private SubjectObserver subjectObserver 然后调用subjectObserver.publish(newDataObjGenerated)

无论我为subscribeOn()observeOn() 指定什么:

  • Schedulers.io()
  • Schedulers.computation()
  • 我的话题
  • Schedulers.newThread

onNext() 及其内部的实际工作是在同一线程上完成的,该线程实际上调用主题上的onNext() 以生成/生成数据。

这是正确的吗?如果是这样,我错过了什么?我期待 doSomething() 在不同的线程上完成。

更新

在我的调用类中,如果我更改调用publish 方法的方式,那么当然会分配一个新线程供订阅者在其上运行。

taskExecutor.execute(() -> subjectObserver.publish(newlyGeneratedObj));

谢谢,

【问题讨论】:

  • 您所体验的内容是设计的。如果您从 threadX 调用 onNext,则链中的下一个运算符将由同一线程调用。 subscribeOn 对您没有帮助,因为您无法从调用 onNext 的位置进行操作,您只能使用 toSerialized 来维护合约。我建议您使用 onNext-operator 执行“事件”实例,然后使用 observeOn 并在更改线程后使用 flatMap 调用 doSomething()。在 subscribe-callback 中,您可以设置您的 UI 或您想要对结果执行的任何操作。

标签: spring rx-java subject-observer


【解决方案1】:

Observable/Subject 上的每个运算符都返回一个具有额外行为的新实例,但是,您的代码仅应用 subscribeOnobserveOn 然后丢弃它们生成的任何内容并订阅原始 Subject .您应该链接方法调用然后订阅:

safeSource = PublishSubject.<AsyncRemoteRequest>create().toSerialized();

safeSource
.subscribeOn(<my taskthreadExecutor>)
.observeOn(<my taskthreadExecutor>)
.subscribe(new Subscriber<AsyncRemoteRequest>() {
     @Override
     public void onNext(AsyncRemoteRequest asyncRemoteRequest) {
         LOGGER.debug("{} invoked.", Thread.currentThread().getName());
         doSomething();
     }
});

请注意,subscribeOnPublishSubject 没有实际影响,因为在其 subscribe() 方法中不会发生订阅副作用。

【讨论】:

  • 太棒了!非常感谢,只是忽略了这个关键方面。谢谢!
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-03-20
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2012-04-05
相关资源
最近更新 更多