【问题标题】:Apache Camel Scheduler delay not respected while parallel processing并行处理时不考虑 Apache Camel 调度程序延迟
【发布时间】:2020-12-10 21:16:34
【问题描述】:

在使用 Apache Camel (3.6.0) 的 Spring Boot (2.3.0.RELEASE) 应用程序中,我遇到了一个奇怪的行为,并行模式下的调度程序组件;特别是,我想创建一个每 n 秒执行给定逻辑的路由。 为此,我编写了以下示例:

@Component
public class TestRoute extends RouteBuilder {
  @Override
  public void configure() {
    from("scheduler:testRoute?delay=2000")
      .log(LoggingLevel.INFO, "Test route begin")
      .setBody(this::generateRandomBody)
      .split(body())
        .parallelProcessing()
        .process(this::consumeElement)
      .end()
      .log(LoggingLevel.INFO, "Test route end");
}

private List<Integer> generateRandomBody(Exchange exchange) {
  return IntStream.range(0, random(20)).boxed().collect(toList());
}

private void consumeElement(Exchange exchange) throws InterruptedException {
  Thread.sleep(random(1000));
}

private int random(int max) {
  return new Random().nextInt(max) + 1;
}

scheduler.delay 参数设置为 2000,我预计 下一次轮询 将在 结束 2 秒后触发当前一个;但是,只有禁用并行处理才能做到这一点。

其实下面是一个并行处理的输出示例:

09:02:56.859    Test route begin
09:02:58.086    Test route end

09:02:58.868    Test route begin
09:02:59.266    Test route end

09:03:00.870    Test route begin
09:03:01.654    Test route end

09:03:02.871    Test route begin
09:03:04.028    Test route end

09:03:04.873    Test route begin

这是一个没有并行处理的输出示例:

09:08:01.666    Test route begin
09:08:11.290    Test route end

09:08:13.292    Test route begin
09:08:21.707    Test route end

09:08:23.709    Test route begin
09:08:26.161    Test route end

09:08:28.162    Test route begin
09:08:37.761    Test route end

09:08:39.763    Test route begin

如您所见,在并行处理时,不考虑延迟,因为下一次轮询时间是相对于当前轮询开始计算的;另一方面,在没有并行处理的情况下,下一次轮询时间大约是当前轮询结束后的 2 秒。

这似乎有点奇怪,因为并行处理文档指出:

如果启用,则同时处理每个拆分的消息。请注意,调用者线程仍将等待,直到所有消息都已完全处理,然后才能继续。它只处理同时发生的拆分器中的子消息。

我在这里缺少什么?如何并行处理消息并在实际路由完成后触发下一次轮询?

【问题讨论】:

    标签: parallel-processing apache-camel delay scheduler


    【解决方案1】:

    在 Camel 3 中,EIP 已被彻底改造为反应式,因此拆分器允许调度程序线程更快完成,因此可以延迟 2 秒。

    如果您希望调度等待整个交换完成(不管不同线程如何处理),那么您需要将其配置为同步。即使是骆驼 2。

     from("scheduler:testRoute?delay=2000&synchronous=true")
    

    【讨论】:

    • 现在它可以完美运行了!我不确定什么更棒:您的产品或您难以置信的及时回复:-) 非常感谢
    猜你喜欢
    • 1970-01-01
    • 2013-08-08
    • 1970-01-01
    • 2019-03-05
    • 2013-08-06
    • 1970-01-01
    • 2018-01-14
    • 2018-09-21
    • 1970-01-01
    相关资源
    最近更新 更多