【问题标题】:Camel Multicast Subroutes out of order骆驼组播子路由乱序
【发布时间】:2014-10-19 00:43:52
【问题描述】:

我有一个场景,我得到消息 A 作为输入。然后消息 A 必须拆分为 3 种不同类型的消息,并转发到其他路由。重要的是消息以精确的顺​​序到达,即。 A-1 必须在 A-2 之前发送,A-2 必须在 A-3 之前发送。

为此,我做了以下(大纲):

from("activemq:queue:somequeue-local")
  .multicast().to("direct:a1","direct:a2","direct:a3");

from("direct:a1)
  //split incoming message and prepare output document for A-1
  .to("activemq:queue:otherqueue")

.from("direct:a2)
  //split incoming message and prepare output document for A-2
  .to("activemq:queue:otherqueue")

.from("direct:a3)
  //split incoming message and prepare output document for A-3
  .to("activemq:queue:otherqueue")

在另一种情况下,负责将信息发送到外部系统,我有

.from("activemq:queue:otherqueue?maxMessagesPerTask=1&concurrentConsumers=1&maxConcurrentConsumers=1")
      // do different stuff based on which type we are called with then end with
     .beanref("somebean","writeToFileAndCallImportbat");

现在,我的问题是,当我到达接收者时,我会以随机顺序收到消息。有时是 A-1,A-3,A-2,有时是正确的,A-1,A-2,A-3。

我尝试将 JMSXGroupID 和 JMSXGroupSeq 添加到消息中,但没有任何运气。

我也尝试过完全跳过 MQ 部分,并使用 direct-vm: 调用共享接收器,但看起来我一次同时调用了三个接收器,并且仍然以随机执行顺序。

我的印象是多播会按顺序运行,除非另有提示?

所采用的方法是否存在根本问题?

我使用的是 Camel 2.12 版。

或者,更直白地说:

  • 我想要一个创建三个不同输出消息并按顺序在它们上执行批处理文件的路由。我该怎么做?

【问题讨论】:

    标签: apache-camel activemq multicast


    【解决方案1】:

    如果您使用Splitter pattern,您是否检查过 streaming 属性是否设置为 false。

    如果启用,Camel 将以流式方式拆分,这意味着它将输入消息拆分为块。这减少了内存开销。例如,如果您拆分大消息,则建议启用流式传输。如果启用流式传输,则子消息回复将无序聚合,例如按照它们返回的顺序。如果禁用,Camel 将以与拆分时相同的顺序处理子消息回复。

    【讨论】:

    • 你完全正确。问题在于我如何处理拆分,尽管不完全出于您所说的原因。但是由于我的示例没有显示详细信息,因此无法查看。
    【解决方案2】:

    所以,结果证明这毕竟不是多播的问题。

    相反,在我的每个子路线中,我都这样做了:

    .split(..stax(SpecialClass)).streaming()
    .beanRef("transformationBean","somefunction")
    .aggregate(constant("1"), new MyAggregator())
    .completionTimeout(5000)
    .completionSize(1000)
    .to(writeToFileAndRunBat)
    

    其中,我假设的意思是“处理拆分中的所有元素,如果您在 5 秒内或在 1000 个元素后没有完成,则退出”。

    我改成

    .split(..stax(SpecialClass), , new MyAggregator()).streaming()
    .beanRef("transformationBean","somefunction")
    .end()
    .to(writeToFileAndRunBat)
    

    仔细想想,这很有意义,因为第一个版本无法真正知道我们何时完成,而最后一个版本(我假设)只是遍历拆分中的所有元素并为每个元素调用聚合器。

    另外,我必须在第一个版本中使用 .end()。所以我猜整个事情只是随机的。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-09-27
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多