让我们看看拆分器返回什么。
根据 Camel 2.2 上的文档。或更旧的拆分器将默认使用您的示例返回最后一条拆分消息,这可能是完成其处理器的最后一行,因此它可能不是第 10 行(使用您的示例)。
在 Camel 2.3 和更新版本上,默认情况下拆分器将返回原始输入消息,即所有 10 行。这是默认行为,您不需要为此编写任何代码。当拆分器默认完成时,它将将此消息传递到下一个端点。
如果我在 Camel 2.3 或更高版本上使用以下 DSL:
<camelContext trace="false" id="blueprintContext" xmlns="http://camel.apache.org/schema/blueprint">
<route id="splitExample">
<from uri="timer://myTimer?period=2000"/>
<setBody>
<simple>A\nB\nC</simple>
</setBody>
<log message="The message body before the splitter contains ${body}"/>
<split>
<tokenize token="\n"></tokenize>
<log message="Split line ${body}"/>
</split>
<log message="The message body after the splitter contains ${body}"/>
</route>
</camelContext>
日志中会出现以下内容:
INFO The message body before the splitter contains
A
B
C
INFO Split line A
INFO Split line B
INFO Split line C
INFO The message body after the splitter contains
A
B
C
正如您所见,默认情况下,在拆分器返回后,camel 会将消息合并为一个。要覆盖此行为,您需要实现自己的聚合器。为此,创建一个类,我们将其命名为MyAggregationStrategy,并使该类实现AggregationStrategy。我使用了here 的 apache 文档中的示例。例如,我们将汇总传入的出价并希望汇总最高出价。
private static class MyAggregationStrategy implements AggregationStrategy {
public Exchange aggregate(Exchange oldExchange, Exchange newExchange)
{
if (oldExchange == null)
{
// the first time we only have the new exchange so it wins the first round
return newExchange;
}
int oldPrice = oldExchange.getIn().getBody(Integer.class);
int newPrice = newExchange.getIn().getBody(Integer.class);
// return the "winner" that has the highest price
return newPrice > oldPrice ? newExchange : oldExchange;
}
}
完成此操作后,您可以通过执行以下操作告诉拆分器使用您的聚合器:
Spring/XML DSL:
<split strategyRef="MyAggregationStrategy ">
在 Java 中:
from("direct:start")
// aggregated by header id and use our own strategy how to aggregate
.aggregate(new MyAggregationStrategy())
希望这能让您充分了解分离器的工作原理。在您的情况下,我可能会为每一行设置一个标头值,指示它是成功还是失败,然后我将使用我的客户聚合器创建一条新消息,其中失败和成功分组为两个列表作为消息正文。一份包含失败的列表,一份包含已完成行项目的列表。
然后可以将这个新的聚合消息发送到处理器或另一个端点以进行进一步处理。例如,您可以获取失败的列表并将其发送到生成文件的路由。 seda 组件在这里可以提供很多帮助。