【发布时间】:2019-07-23 16:25:57
【问题描述】:
据我了解,下游的其余部分需要在线程池中的一个线程上处理(我将其设置为 1024)
这是我的代码。
Flux<String> ips =
Flux.fromIterable(items).map(Item::getIp);
ips
.publishOn(Schedulers.fromExecutor(Executors.newFixedThreadPool(1024)))
.map(ip -> {
try {
Request request = new Request.Builder().url("https://" + ip + ":443").build();
Response response = okHttpClient.newCall(request).execute();
return response.code();
} catch (Exception e) {
}
return -1;
})
.subscribe(System.out::println);
由于某种原因,与以下代码相比,此代码非常慢:
appRules
.stream()
.parallel()
.map(Item::getIp)
.forEach(ip -> {
try {
Request request = new Request.Builder().url("https://" + ip + ":443").build();
Response response = okHttpClient.newCall(request).execute();
System.out.println(response.code());
} catch (Exception e) {
}
System.out.println(-1);
});
为什么?当您受 IO 限制时,同时处理项目流的正确方法是什么? (而不是 CPU)
【问题讨论】:
-
考虑到并行流用于 CPU 绑定操作,这是一个令人惊讶的结果。您提供的示例代码以其当前形式并不真正有用。您能否附上一些日志,也许我们可以在其中看到线程名称和时间戳?这将有助于了解发生了什么。了解您处理多少延迟和并发也会很有趣。
标签: java reactive-programming project-reactor