【问题标题】:Project Reactor: downstream is slowProject Reactor:下游很慢
【发布时间】:2019-07-23 16:25:57
【问题描述】:

据我了解,下游的其余部分需要在线程池中的一个线程上处理(我将其设置为 1024)

这是我的代码。

Flux<String> ips =
        Flux.fromIterable(items).map(Item::getIp);
ips
        .publishOn(Schedulers.fromExecutor(Executors.newFixedThreadPool(1024)))
        .map(ip -> {
            try {
                Request request = new Request.Builder().url("https://" + ip + ":443").build();
                Response response = okHttpClient.newCall(request).execute();
                return response.code();
            } catch (Exception e) {
            }

            return -1;
        })
        .subscribe(System.out::println);

由于某种原因,与以下代码相比,此代码非常慢:

appRules
        .stream()
        .parallel()
        .map(Item::getIp)
        .forEach(ip -> {
            try {
                Request request = new Request.Builder().url("https://" + ip + ":443").build();
                Response response = okHttpClient.newCall(request).execute();
                System.out.println(response.code());
            } catch (Exception e) {
            }

            System.out.println(-1);
        });

为什么?当您受 IO 限制时,同时处理项目流的正确方法是什么? (而不是 CPU)

【问题讨论】:

  • 考虑到并行流用于 CPU 绑定操作,这是一个令人惊讶的结果。您提供的示例代码以其当前形式并不真正有用。您能否附上一些日志,也许我们可以在其中看到线程名称和时间戳?这将有助于了解发生了什么。了解您处理多少延迟和并发也会很有趣。

标签: java reactive-programming project-reactor


【解决方案1】:

执行速度较慢的原因是 Reactor 管道执行默认是单线程的。因此,当您使用Flux.publishOn 运算符时,您只是说您希望这部分管道在给定线程池中的线程上执行,但它不会同时在单独的线程上执行每个项目。

实现并发的一种选择是使用parallel Flux,它创建了所谓的rails,数据可以在其中并行流动,但它主要用于CPU密集型操作。

更好的选择是将阻塞代码包装在 Mono 中并将其委托给专用线程池,类似于您所做的,只是这一次每个任务都会获得自己的线程:

private static void reactorProcess()
{
    ExecutorService executor = Executors.newFixedThreadPool(1024);

    Flux.range(1, 1024)
        .flatMap(a -> Mono.fromRunnable(() -> simulateHttpCall())
                          .subscribeOn(Schedulers.fromExecutor(executor)))
        .blockLast();

    executor.shutdown();
}

private static void simulateHttpCall()
{
    try
    {
        Thread.sleep(100);
        System.out.println(Thread.currentThread().getName() + ": " + ZonedDateTime.now());
    } catch (InterruptedException e)
    {
        e.printStackTrace();
    }
}

我还要注意,Java 并行流不是这种处理的可行替代方案。默认情况下,它使用ForkJoinPool,这也适用于受 CPU 限制的操作,并且只使用与机器中 CPU 内核数一样多的线程。

除此之外,如果您想充分利用反应式编程的全部功能,您应该考虑使用支持非阻塞 IO 的 HTTP 客户端,例如 Spring 的 WebClient。通过使用非阻塞 HTTP 客户端,您无需再担心定义线程池,因为不会阻塞任何线程,并且固定的少量线程将能够处理数千个并发请求。

【讨论】:

  • 非常感谢您的详细回答!我想我会先尝试 WebClient,但学习如何直接进行操作非常棒。
猜你喜欢
  • 2019-12-06
  • 1970-01-01
  • 2018-09-26
  • 2017-05-18
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-02-08
相关资源
最近更新 更多