【问题标题】:Camel aggregation strategy骆驼聚合策略
【发布时间】:2014-10-13 18:43:17
【问题描述】:

我正在解析一个 CSV 文件,将其拆分并通过骆驼中的多个处理器进行路由。有两个端点,一个具有错误数据,而另一个具有经过验证的数据。

在汇总数据时我需要建议。

假设 CSV 文件有 10 条记录,其中 6 条到达一个端点,4 条到达另一个端点。我如何知道是否所有 10 都已从每个端点的文件中完成并在聚合器之前移动。 我需要创建两个文件,一个包含有效数据,另一个包含来自单个文件的损坏数据。

【问题讨论】:

    标签: java apache-camel


    【解决方案1】:

    让我们看看拆分器返回什么。

    根据 Camel 2.2 上的文档。或更旧的拆分器将默认使用您的示例返回最后一条拆分消息,这可能是完成其处理器的最后一行,因此它可能不是第 10 行(使用您的示例)。

    在 Camel 2.3 和更新版本上,默认情况下拆分器将返回原始输入消息,即所有 10 行。这是默认行为,您不需要为此编写任何代码。当拆分器默认完成时,它将将此消息传递到下一个端点。

    如果我在 Camel 2.3 或更高版本上使用以下 DSL:

    <camelContext trace="false" id="blueprintContext" xmlns="http://camel.apache.org/schema/blueprint">
    <route id="splitExample">
        <from uri="timer://myTimer?period=2000"/>
        <setBody>
            <simple>A\nB\nC</simple>
    
        </setBody>
    
        <log message="The message body before the splitter contains ${body}"/>
        <split>
            <tokenize token="\n"></tokenize>
    
            <log message="Split line ${body}"/>
        </split>
        <log message="The message body after the splitter contains ${body}"/>
    </route>
    </camelContext>  
    

    日志中会出现以下内容:

     INFO  The message body before the splitter contains 
           A
           B
           C
     INFO  Split line A
     INFO  Split line B
     INFO  Split line C
     INFO  The message body after the splitter contains 
           A
           B
           C
    

    正如您所见,默认情况下,在拆分器返回后,camel 会将消息合并为一个。要覆盖此行为,您需要实现自己的聚合器。为此,创建一个类,我们将其命名为MyAggregationStrategy,并使该类实现AggregationStrategy。我使用了here 的 apache 文档中的示例。例如,我们将汇总传入的出价并希望汇总最高出价。

    private static class MyAggregationStrategy implements AggregationStrategy {
    
        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) 
        {
            if (oldExchange == null) 
            { 
               // the first time we only have the new exchange so it wins the first round
               return newExchange;
            }
            int oldPrice = oldExchange.getIn().getBody(Integer.class);
            int newPrice = newExchange.getIn().getBody(Integer.class);
            // return the "winner" that has the highest price
            return newPrice > oldPrice ? newExchange : oldExchange;
        }
    }
    

    完成此操作后,您可以通过执行以下操作告诉拆分器使用您的聚合器:

    Spring/XML DSL:

    <split  strategyRef="MyAggregationStrategy ">
    

    在 Java 中:

    from("direct:start")
    // aggregated by header id and use our own strategy how to aggregate
    .aggregate(new MyAggregationStrategy())
    

    希望这能让您充分了解分离器的工作原理。在您的情况下,我可能会为每一行设置一个标头值,指示它是成功还是失败,然后我将使用我的客户聚合器创建一条新消息,其中失败和成功分组为两个列表作为消息正文。一份包含失败的列表,一份包含已完成行项目的列表。

    然后可以将这个新的聚合消息发送到处理器或另一个端点以进行进一步处理。例如,您可以获取失败的列表并将其发送到生成文件的路由。 seda 组件在这里可以提供很多帮助。

    【讨论】:

    • 感谢@Namphibian 的详细回复。我对分离器进行了更深入的研究,并遵循了您的建议,现在一切正常。将其聚合为 List> 后,我使用了自定义处理器,该处理器使用 ProducerTemplate 将文件路由到不同的目的地。再次感谢!!!
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-10-30
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多