【问题标题】:Am I using Apache Camel aggregator correctly?我是否正确使用 Apache Camel 聚合器?
【发布时间】:2014-01-01 15:21:47
【问题描述】:

我对 route(用 Apache Camel 术语)的理解是它表示从一个端点到另一个端点的数据流,并且它将在不同的 处理器沿途对数据执行 EIP 类型的操作。

如果这是对路线的正确/公平评估,那么我正在模拟一个问题,我相信需要同一 CamelContext 内的多条路线(我使用的是 Spring): p>

  1. Route 1:从 Source-1 中提取数据,对其进行处理,将其转换为 List<SomePOJO>,然后将其发送到聚合器
  2. Route 2:从 Source-2 中提取数据,对其进行处理,还将其转换为 List<SomePOJO>,然后将其发送到聚合器
  3. Route 3:包含一个聚合器,等待它从 Route 1 和 Route 2 收到 List<SomePOJO>,然后继续处理聚合列表

事情是这样的:两个List<SomePOJO>s 需要同时到达聚合器,或者更确切地说,聚合器 bean 必须等待直到它收到数据在它可以将 2 个列表聚合到单个 List<SomePOJO> 并将聚合列表发送到 Route 3 的其余部分之前,从两条路由中提取。

到目前为止,我有以下伪编码<camelContext>

<camelContext id="my-routes" xmlns="http://camel.apache.org/schema/spring">
    <!-- Route 1 -->
    <route id="route-1">
        <from uri="time://runOnce?repeatCount=1&amp;delay=10" />

        <!-- Extracts data from Source 1, processes it, and then produces a List<SomePOJO>. -->
        <to uri="bean:extractor1?method=process" />

        <!-- Send to aggregator. -->
        <to uri="direct:aggregator" />
    </route>

    <!-- Route 2 -->
    <route id="route-2">
        <from uri="time://runOnce?repeatCount=1&amp;delay=10" />

        <!-- Extracts data from Source 2, processes it, and then produces a List<SomePOJO>. -->
        <to uri="bean:extractor2?method=process" />

        <!-- Send to aggregator. -->
        <to uri="direct:aggregator" />
    </route>

    <!-- Route 3 -->
    <route id="route-3">
        <from uri="direct:aggregator" />

        <aggregate strategyRef="listAggregatorStrategy">
            <correlationExpression>
                <!-- Haven't figured this part out yet. -->
            </correlationExpression>
            <to uri="bean:lastProcessor?method=process" />
        </aggregate>
    </route>
</camelContext>

<bean id="listAggregatorStrategy" class="com.myapp.ListAggregatorStrategy" />

然后在 Java 中:

public class ListAggregatorStrategy implements AggregatoryStrategy {
    public Exchange aggregate(Exchange exchange) {
        List<SomePOJO> route1POJOs = extractRoute1POJOs(exchange);
        List<SomePOJO> route2POJOs = extractRoute2POJOs(exchange);

        List<SomePOJO> aggregateList = new ArrayList<SomePOJO>(route1POJOs);
        aggregateList.addAll(route2POJOs);

        return aggregateList;
    }
}

我的问题

  1. 我的基本设置是否正确?换句话说,我是否正确使用direct:aggregator 端点将数据从route-1route-2 发送到route-3 的聚合器?
  2. 我的聚合器会按照我期望的方式工作吗?假设 route-1 中的 extractor1 bean 只需 5 秒即可运行,但 route-2 中的 extractor2 bean 需要 2 分钟才能运行。在 t=5 时,聚合器应该从 extractor1 接收数据并开始等待(2 分钟)直到 extractor2 完成并将其余数据提供给聚合器。是吗?

【问题讨论】:

    标签: java apache-camel routes aggregator enterprise-integration


    【解决方案1】:

    听起来你是在正确的轨道上,Aggregator 页面有很多关于这方面的好信息。

    &lt;correlationExpression&gt; 是匹配每个路由的 Exchange 的关键,completionSize 可以指定等待的数量。在您的情况下,看起来每条路由仅设计为运行一次,在这种情况下,表达式可能使用来自每个 Exchange 的固定标头值,否则您需要为每条路由设置一个计数器类之类的东西。

    以下是您的示例的更新:

    <route id="route-1">
        <from uri="time://runOnce?repeatCount=1&amp;delay=10" />
        <to uri="bean:extractor1?method=process" />
        <setHeader headerName="id">
            <constant>myHeaderValue</constant>
        </setHeader>
        <to uri="direct:aggregator" />
    </route>
    
    <route id="route-2">
        <from uri="time://runOnce?repeatCount=1&amp;delay=10" />
        <to uri="bean:extractor2?method=process" />
        <setHeader headerName="id">
            <constant>myHeaderValue</constant>
        </setHeader>
        <to uri="direct:aggregator" />
    </route>
    
    <route id="route-3">
        <from uri="direct:aggregator" />
    
        <aggregate strategyRef="listAggregatorStrategy" completionSize="2">
            <correlationExpression>
                <simple>header.id</simple>
            </correlationExpression>
            <to uri="bean:lastProcessor?method=process" />
        </aggregate>
    </route>
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2014-10-10
      • 2023-03-26
      • 2015-11-22
      • 2023-03-08
      • 2012-07-28
      相关资源
      最近更新 更多