【问题标题】:RxJava - Why the executors only use one threadRxJava - 为什么执行器只使用一个线程
【发布时间】:2015-12-30 05:50:43
【问题描述】:

我创建了一个固定线程池来处理每 300 毫秒发出的事件,并假设该过程需要 1000 毫秒。假设多线程可以工作,但只有一个线程被重用。

如果我将 sleepTime 设置为小于 300ms,则处理线程会改变,但那是无用的。

问题:我可以做些什么来使它并发?为什么程序要复用线程?

提前谢谢你

public static void main(String[] args) throws InterruptedException {
    long sleepTime = 1000;
    ExecutorService e = Executors.newFixedThreadPool(3);

    Observable.interval(300, TimeUnit.MILLISECONDS)
    .subscribeOn(Schedulers.computation())
    .flatMap(new Func1<Long, Observable<Long>>() {
        @Override
        public Observable<Long> call(Long pT) {
            return Observable.just(pT).subscribeOn(Schedulers.from(e));
        }
    })
    .doOnNext(new Action1<Long>() {

        @Override
        public void call(Long pT) {
            try {
                Thread.sleep(sleepTime);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    })
    .subscribe(new Action1<Long>() {

        @Override
        public void call(Long pT) {
            System.out.println("i am " + pT + "in thread:" + Thread.currentThread().getName());

        }
    });


    Thread.sleep(50000);
    e.shutdownNow();

}

日志

i am 0in thread:pool-1-thread-1
i am 1in thread:pool-1-thread-1
i am 2in thread:pool-1-thread-1
i am 3in thread:pool-1-thread-1
i am 4in thread:pool-1-thread-1
i am 5in thread:pool-1-thread-1
i am 6in thread:pool-1-thread-1
i am 7in thread:pool-1-thread-1
i am 8in thread:pool-1-thread-1
i am 9in thread:pool-1-thread-1
i am 10in thread:pool-1-thread-1
i am 11in thread:pool-1-thread-1

【问题讨论】:

  • 请注意:您可以使用 jvisualvm 更可靠地了解在调度方面发生了什么以及使用了哪些线程:docs.oracle.com/javase/6/docs/technotes/tools/share/…
  • @ReutSharabani 在 Eclipse Debug 视图中,我可以看到线程已经生成,但是程序只重用了一个线程。

标签: java multithreading rx-java reactivex rx-javafx


【解决方案1】:

根据我在您的代码中的理解,生产者的生产速度比订阅者快。但是Observable&lt;Long&gt; interval(long interval, TimeUnit unit) 实际上不支持Backpressure。文档指出

此运算符不支持背压,因为它使用时间。如果 下游需要更慢它应该减慢计时器或使用某些东西 比如{@link #onBackpressureDrop}。

如果您的处理速度确实比生产者慢,那么您可以在订阅者代码中执行的操作是这样的

.subscribe(new Action1<Long>() {

    @Override
    public void call(Long pT) {
        e.submit(new Runnable() {
            System.out.println("i am " + pT + "in thread:" + Thread.currentThread().getName());

        }
    }
});

【讨论】:

  • 当然,我可以像你说的那样将任务提交到不同的线程。但我想用调度器自然地做到这一点。
  • @Rockman12352 我同意,但据我所知,Observable 每次发射都会在单个线程中执行整个执行(从生产者到订阅者)。对于生产者中的每个Long 数据的含义,它将在单线程中调用所有订阅者。我可能在这里错了,但这就是我到目前为止所得到的
【解决方案2】:

相反

 .subscribeOn(Schedulers.computation())

试试

 .observeOn(Schedulers.computation())

我之前做的这个使用 Rx 进行并发的例子非常好用

   public class ObservableZip {

private Scheduler scheduler;
private Scheduler scheduler1;
private Scheduler scheduler2;

@Test
public void testAsyncZip() {
    scheduler = Schedulers.newThread();
    scheduler1 = Schedulers.newThread();
    scheduler2 = Schedulers.newThread();
    long start = System.currentTimeMillis();
    Observable.zip(obAsyncString(), obAsyncString1(), obAsyncString2(), (s, s2, s3) -> s.concat(s2)
                                                                                        .concat(s3))
              .subscribe(result -> showResult("Async:", start, result));
}




public void showResult(String transactionType, long start, String result) {
    System.out.println(result + " " +
                               transactionType + String.valueOf(System.currentTimeMillis() - start));
}


public Observable<String> obAsyncString() {
    return Observable.just("")
                     .observeOn(scheduler)
                     .doOnNext(val -> {
                         System.out.println("Thread " + Thread.currentThread()
                                                              .getName());
                     })
                     .map(val -> "Hello");
}

public Observable<String> obAsyncString1() {
    return Observable.just("")
                     .observeOn(scheduler1)
                     .doOnNext(val -> {
                         System.out.println("Thread " + Thread.currentThread()
                                                              .getName());
                     })
                     .map(val -> " World");
}

public Observable<String> obAsyncString2() {
    return Observable.just("")
                     .observeOn(scheduler2)
                     .doOnNext(val -> {
                         System.out.println("Thread " +  Thread.currentThread()
                                                               .getName());
                     })
                     .map(val -> "!");
  }

 }

【讨论】:

  • 它不起作用。在您的示例中,3 个 observable 来自不同的线程,因此它自然是多线程的。但就我而言,我想把它分成一个池子。
  • 但是每次调用 Schedulers.computation() 并没有给你一个新线程?
  • 是的,我会试试另一台电脑
【解决方案3】:

我在 GitHub 上找到了答案!

内部的 observable 确实在多线程上发出了,但 next 上的后续不是。如果我想让它并行,我应该在内部 observable 中进行。

【讨论】:

  • 你有链接或例子吗?
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2015-12-21
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2023-03-28
  • 1970-01-01
相关资源
最近更新 更多