【问题标题】:Rxjava: Subscribe on the specific threadRxjava:订阅特定线程
【发布时间】:2015-11-06 06:47:41
【问题描述】:

我是 Rxjava 的新手。 我有以下代码:

    System.out.println("1: " + Thread.currentThread().getId());
    Observable.create(new rx.Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subcriber) {
            System.out.println("2: " + Thread.currentThread().getId());
            // query database
            String result = ....
            subcriber.onNext(result);
        }

    }).subscribeOn(Schedulers.newThread()).subscribe(countResult -> {
        System.out.println("3: " + Thread.currentThread().getId());
    });

例如,输出将是:

1:50
2:100
3:100

我希望订阅者在 id 为 50 的线程上运行。我该怎么做?

【问题讨论】:

  • 您能告诉我们您为什么要这样做吗?我们缺少一些上下文。

标签: multithreading rx-java subscriber


【解决方案1】:

我认为有两种情况。要么你需要它在 UI 线程上运行,要么因为同步。据我所知,您不能在特定线程上调用函数,因为调用该方法时,它会绑定到线程的上下文,因此不可能从一个线程调用一个方法到另一个线程。您的问题是订阅者中的方法是从Schedulers.newThread() 调用的。我还发现了这个github issue 关于Schedulers.currentThread()。您需要的是在观察者被调用时通知调用者线程。

你也可以使用akka,用它写并发​​代码更简单。

抱歉我的语法不好。

【讨论】:

  • 奇怪,他们提到了AndroidSchedulers.handlerThread,但是我的rxjava中没有这样的方法..
【解决方案2】:

来自docs

默认情况下,一个 Observable 和你应用到的操作符链 它会在同一个线程上完成它的工作,并通知它的观察者 在其上调用其订阅方法。 SubscribeOn 运算符 通过指定一个不同的调度程序来改变这种行为 Observable 应该运行。 ObserveOn 运算符指定一个 Observable 将用于发送通知的不同调度程序 对其观察者。

因此,您可以使用subscribe 而不是 subscribeOn 在创建的同一线程上观察您的收藏,如下所示:

Observable.create(new rx.Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subcriber) {
            System.out.println("2: " + Thread.currentThread().getId());
            // query database
            String result = ....
            subcriber.onNext(result);
        }

    }).subscribe(countResult -> {
        System.out.println("3: " + Thread.currentThread().getId());
    });

更新: 如果您的应用程序是 Android 应用程序,您可以像在后台线程上一样使用订阅,并使用 Handler 消息将结果传递给主线程。

如果您的应用程序是 Java 应用程序,我可能建议使用 wait() 和 notify() 机制,或者考虑使用 EventBus 或 akka 等框架来处理更复杂的场景。

【讨论】:

    【解决方案3】:

    在 RxJava 1.0.15 中,您可以在 subscribe 之前应用 toBlocking(),一切都将在创建整个运算符序列的线程上运行。

    【讨论】:

    • 我认为 OP 不想让他/她的代码同步。相反,他想在调用每个订阅者之前切换到某个线程。
    • 该操作提到线程 50,它是启动异步计算的线程。除了阻止订阅返回到同一个线程之外,别无他法。
    • 我正在考虑向Schedulers.from(executorService) 提供ExecutorService,类似于:ExecutorService executorService = Executors.newSingleThreadExecutor(r -&gt; Thread.currentThread());。不幸的是,IllegalThreadStateException 崩溃。
    【解决方案4】:

    所以 subscribeOn 表示 Observable 将在哪个线程上开始发射项目,observeOn “切换”到 observable 链的其余部分的线程。在订阅之前放置一个 observeOn(schedulers.CurrentThread()) ,您将处于创建此 observable 的线程中,而不是在其中执行它的线程中。这是一个很好地解释 rxjava 线程的资源。 http://www.grahamlea.com/2014/07/rxjava-threading-examples/

    【讨论】:

    • RxJava 中没有 CurrentThread 调度程序,除非调度程序是单线程的,否则无法通过多个 ObserveOn 将执行返回到管道不同部分的同一线程。
    • 尝试 Schedulers.immediate(),它“创建并返回一个立即在当前线程上执行工作的 {@link Scheduler}。”
    • 这仍然不坚持创建线程,实际上是一个无操作。
    【解决方案5】:

    我相信@akarnokd 是对的。您要么在没有 .subscribeOn(Schedulers.newThread()) 的情况下运行,以便它同步发生,要么在订阅之前使用 toBlocking()。或者,如果您只是希望所有事情都发生在同一个线程上,但它不必是 current 线程,那么您可以这样做:

    Observable
        .defer(() -> Observable.create(...))
        .subscribeOn(Schedulers.newThread())
        .subscribe(subscriber);
    

    defer 确保对create 的调用发生在订阅线程上。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2015-06-04
      • 1970-01-01
      • 2015-07-23
      • 1970-01-01
      • 1970-01-01
      • 2019-03-19
      • 1970-01-01
      • 2021-03-25
      相关资源
      最近更新 更多