【问题标题】:RxJava .subscribeOn(Schedulers.newThread()) questionsRxJava .subscribeOn(Schedulers.newThread()) 问题
【发布时间】:2020-01-25 07:42:28
【问题描述】:

我使用的是普通的 JDK 8。我有这个简单的 RxJava 示例:

Observable
.from(Arrays.asList("one", "two", "three"))
.doOnNext(word -> System.out.printf("%s uses thread %s%n", word, Thread.currentThread().getName()))
//.subscribeOn(Schedulers.newThread())
.subscribe(word -> System.out.println(word));

它逐行打印出单词,与线程相关的信息交织在一起,正如预期的那样,这是所有下一次调用的“主要”。

但是,当我取消注释 subscribeOn(Schedulers.newThread()) 调用时,根本没有打印任何内容。为什么它不起作用?我本来希望它为每个onNext() 调用和doOnNext() 启动一个新线程以打印该线程的名称。现在,我什么也看不到,其他调度程序也是如此。

当我在 main 的末尾添加对 Thread.sleep(10000L) 的调用时,我可以看到输出,这表明 RxJava 使用的线程都是守护进程。是这样吗?这是否可以通过某种方式进行更改,但使用自定义 ThreadFactory 或类似概念,而不必实现自定义调度程序?

通过上述更改,线程名称始终为RxNewThreadScheduler-1,而newThread 的文档显示“为每个工作单元创建一个新{@link Thread} 的调度程序”。不是应该为所有排放创建一个新线程吗?

【问题讨论】:

  • 你是对的,所有RxJava线程默认都是守护进程。您可以通过提供自定义 RxJavaSchedulersHook 来覆盖此行为
  • 使用钩子我仍然需要提供一个成熟的调度器实现。我实际上可以不用钩子,只需将我自己的调度程序传递给 subscribeOn 方法。使用钩子我可以全局更改 Schedulers.io() 等方法返回的内容,但我不需要这个。不过还是谢谢。

标签: java rx-java


【解决方案1】:

正如 Vladimir 提到的,RxJava 标准调度程序在守护线程上运行工作,这些线程在您的示例中因主线程退出而终止。我想强调的是,他们不会将每个值都安排在新线程上,而是将每个订阅者的值流安排在新创建的线程上。第二次订阅会给你“RxNewThreadScheduler-2”。

您实际上不需要更改默认调度程序,只需使用 Schedulers.from() 包装您自己的基于 Executor 的调度程序,并在需要时将其作为参数提供:

ThreadPoolExecutor exec = new ThreadPoolExecutor(
        0, 64, 2, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
exec.allowCoreThreadTimeOut(true);

Scheduler s = Schedulers.from(exec);

Observable
.from(Arrays.asList("one", "two", "three"))
.doOnNext(word -> System.out.printf("%s uses thread %s%n", word,
    Thread.currentThread().getName()))
.subscribeOn(s)
.subscribe(word -> System.out.println(word));

我有一系列关于 RxJava 调度程序的blog posts,它们应该可以帮助您实现“更永久”的变体。

【讨论】:

    【解决方案2】:

    与新手的看法相反,反应式流本质上不是并发的,而是本质上是异步的。它们本质上也是顺序的,并且必须在流中配置并发。简而言之,反应式流在其末端自然是顺序的,但在其核心可以是并发的

    秘诀是在流中使用 flatMap() 运算符。该运算符从源流中获取 Observable 输入,并在内部将其作为它订阅的 Observable> 流重新发出 strong>一次所有实例。只要 flatMap() 内部流在多线程上下文中执行,它就会同时执行提供的 Function 应用您的逻辑,最后重新发出结果在原始流上作为它自己的排放。

    这听起来很复杂(乍一看有点复杂),但带有解释的简单示例有助于理解这个概念。

    从类似的问题 here 和关于 RxJava2 SchedulersConcurrency 的文章中查找更多详细信息,其中包含代码示例以及有关如何顺序和同时使用调度程序的详细说明。

    希望这会有所帮助,

    软蛋

    【讨论】:

      【解决方案3】:
      public class MainClass {
      
          public static void main(String[] args) {
      
              Scheduler scheduler = Schedulers.from(Executors.newFixedThreadPool(10, Executors.defaultThreadFactory()));
      
              Observable.interval(1,TimeUnit.SECONDS)
                      .doOnNext(word -> System.out.printf("%s uses thread %s%n", word,
                              Thread.currentThread().getName()))
                      .subscribeOn(scheduler)
                      .observeOn(Schedulers.io())
                      .doOnNext(word -> System.out.printf("%s uses thread %s%n", word,
                              Thread.currentThread().getName()))
                      .subscribe();
          }
      
      }
      

      【讨论】:

        猜你喜欢
        • 2019-11-23
        • 2016-01-29
        • 1970-01-01
        • 2018-05-07
        • 1970-01-01
        • 2019-05-26
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多