【发布时间】:2018-09-04 20:55:13
【问题描述】:
https://stackoverflow.com/a/47136941/1776585 的后续问题
在使用 Flux + split() + FluxMessageChannel 时,我无法让我的集成处理程序在并行线程中运行。
考虑以下 sn-p:
// ...
.handle(message -> Flux.range(0, 10)
.doOnNext(i -> LOG.info("> " + i))
.subscribeOn(Schedulers.parallel()))
.split()
.channel(new FluxMessageChannel())
.handle(message -> LOG.info(" -> " + message.getPayload())))
// ...
所有日志都在一个线程中输出:
[ parallel-1] d.a.Application : > 0
[ parallel-1] d.a.Application : -> 0
[ parallel-1] d.a.Application : > 1
[ parallel-1] d.a.Application : -> 1
[ parallel-1] d.a.Application : > 2
[ parallel-1] d.a.Application : -> 2
[ parallel-1] d.a.Application : > 3
[ parallel-1] d.a.Application : -> 3
[ parallel-1] d.a.Application : > 4
[ parallel-1] d.a.Application : -> 4
[ parallel-1] d.a.Application : > 5
[ parallel-1] d.a.Application : -> 5
[ parallel-1] d.a.Application : > 6
[ parallel-1] d.a.Application : -> 6
[ parallel-1] d.a.Application : > 7
[ parallel-1] d.a.Application : -> 7
[ parallel-1] d.a.Application : > 8
[ parallel-1] d.a.Application : -> 8
[ parallel-1] d.a.Application : > 9
[ parallel-1] d.a.Application : -> 9
如何强制多线程处理?
我已经尝试在Flux 上使用.parallel().runOn(),但这只会使获取数据并行,但实际处理仍然在一个线程上运行。
我也在Flux 上尝试了.publishOn(Schedulers.parallel()),但没有效果。
并且还将ExecutorChannel 或Poller 与执行程序添加到处理程序没有帮助。
【问题讨论】:
标签: java spring spring-integration reactive-programming project-reactor