【发布时间】:2020-12-10 21:16:34
【问题描述】:
在使用 Apache Camel (3.6.0) 的 Spring Boot (2.3.0.RELEASE) 应用程序中,我遇到了一个奇怪的行为,并行模式下的调度程序组件;特别是,我想创建一个每 n 秒执行给定逻辑的路由。 为此,我编写了以下示例:
@Component
public class TestRoute extends RouteBuilder {
@Override
public void configure() {
from("scheduler:testRoute?delay=2000")
.log(LoggingLevel.INFO, "Test route begin")
.setBody(this::generateRandomBody)
.split(body())
.parallelProcessing()
.process(this::consumeElement)
.end()
.log(LoggingLevel.INFO, "Test route end");
}
private List<Integer> generateRandomBody(Exchange exchange) {
return IntStream.range(0, random(20)).boxed().collect(toList());
}
private void consumeElement(Exchange exchange) throws InterruptedException {
Thread.sleep(random(1000));
}
private int random(int max) {
return new Random().nextInt(max) + 1;
}
scheduler.delay 参数设置为 2000,我预计 下一次轮询 将在 结束 2 秒后触发当前一个;但是,只有禁用并行处理才能做到这一点。
其实下面是一个并行处理的输出示例:
09:02:56.859 Test route begin
09:02:58.086 Test route end
09:02:58.868 Test route begin
09:02:59.266 Test route end
09:03:00.870 Test route begin
09:03:01.654 Test route end
09:03:02.871 Test route begin
09:03:04.028 Test route end
09:03:04.873 Test route begin
这是一个没有并行处理的输出示例:
09:08:01.666 Test route begin
09:08:11.290 Test route end
09:08:13.292 Test route begin
09:08:21.707 Test route end
09:08:23.709 Test route begin
09:08:26.161 Test route end
09:08:28.162 Test route begin
09:08:37.761 Test route end
09:08:39.763 Test route begin
如您所见,在并行处理时,不考虑延迟,因为下一次轮询时间是相对于当前轮询开始计算的;另一方面,在没有并行处理的情况下,下一次轮询时间大约是当前轮询结束后的 2 秒。
这似乎有点奇怪,因为并行处理文档指出:
如果启用,则同时处理每个拆分的消息。请注意,调用者线程仍将等待,直到所有消息都已完全处理,然后才能继续。它只处理同时发生的拆分器中的子消息。
我在这里缺少什么?如何并行处理消息并在实际路由完成后触发下一次轮询?
【问题讨论】:
标签: parallel-processing apache-camel delay scheduler