【问题标题】:Throttling based on content基于内容的节流
【发布时间】:2012-09-04 08:07:02
【问题描述】:

我想知道 Camel 是否可以根据交换的内容进行节流。

情况如下:我必须通过soap调用一个webservice。其中,发送给那个webservice的参数有一个customerId。问题是,如果给定的 customerId 每分钟有超过 1 个请求,Web 服务会发回错误。

我想知道是否可以使用 Camel 实现每个 customerId 的限制。所以不应该对所有消息都进行限制,而应该只对具有相同 customerId 的消息进行限制。

如果我需要澄清我的问题,请告诉我如何实施。

【问题讨论】:

  • 在现有的 Throttler EIP 中添加对组的支持实际上是个好主意。也就是说,尽管消息在等待释放时会保留在内存中。让我为这个增强记录一个 JIRA。
  • 这里只是为了记录是票:issues.apache.org/jira/browse/CAMEL-5599
  • @ClausIbsen - 已经有一段时间了,但我会支持该功能!

标签: apache-camel


【解决方案1】:

ActiveMQ Message Groups 旨在处理这种情况。因此,如果您可以在路由中引入 JMS 队列跃点,那么只需将 JMSXGroupId 标头设置为 customerId。然后在另一条路线中,您可以从此队列中消费并发送到您的网络服务以获取您描述的行为。

另请参阅http://camel.apache.org/parallel-processing-and-ordering.html 了解更多信息...

【讨论】:

    【解决方案2】:

    虽然 ActiveMQ 消息组肯定会解决唯一客户 ID 的并行处理,但在我的评估中,Claus 是正确的,即为每个唯一组引入限制代表 Camel/ActiveMQ 未实现的功能。

    单独的消息组将无法满足所描述的 SLA。虽然每组消息(与客户 ID 相关)将按顺序处理,每组一个线程,只要请求不到一分钟即可收到响应,则不会强制执行每个客户每分钟一个请求的要求.

    也就是说,我很想知道是否可以将消息组和节流策略结合起来,以模拟 JIRA 中的功能请求。到目前为止,我的尝试都失败了。我当时在想一些事情:

    <route>
      <from uri="activemq:pending?maxConcurrentConsumers=10"/>
      <throttle timePeriodMillis="60000">
        <constant>1</constant>
        <to uri="mock:endpoint"/>
      </throttle>
    </route>
    

    但是,限制似乎适用于移动到端点的整个请求集,而不是适用于每个单独的消费者。我不得不承认,我有点惊讶地发现这种行为。我的期望是限制将单独应用于每个消费者,这将满足原始问题中的 SLA,前提是消息在 JMSXGroupId 标头中包含客户 ID。

    【讨论】:

      【解决方案3】:

      我遇到了类似的问题,最后想出了这里描述的解决方案。

      我的假设是:

      • 消息的顺序并不重要(尽管可以通过重新排序器解决)
      • 每个客户 ID 的消息总量不是很大,因此运行时没有饱和。

      解决办法:

      • 运行聚合器 1 分钟,同时使用 customerID 将具有相同客户 ID 的消息组装到列表中
      • 使用 Splitter 将列表拆分为单独的消息
      • 将第一条消息从拆分器发送到实际服务
      • 将列表的其余部分重新路由回聚合器。

      Java DSL 版本更容易理解:

      final AggregationStrategy aggregationStrategy = AggregationStrategies.flexible(Object.class)
              .accumulateInCollection(ArrayList.class);
      
      from("direct:start")
          .log("Receiving ${body}")
          .aggregate(header("customerID"), aggregationStrategy).completionTimeout(60000)
              .log("Aggregate: releasing ${body}")
              .split(body())
              .choice()
                  .when(header(Exchange.SPLIT_INDEX).isEqualTo(0))
                      .log("*** Processing: ${body}")
                      .to("mock:result")
                  .otherwise()
                    .to("seda:delay")
              .endChoice();
      
      from("seda:delay")
          .delay(0)
          .to("direct:start");
      

      Spring XML 版本如下所示:

       <!-- this is our aggregation strategy defined as a spring bean -->
       <!-- see http://stackoverflow.com/questions/27404726/how-does-one-set-the-pick-expression-for-apache-camels-flexibleaggregationstr -->
       <bean id="_flexible0" class="org.apache.camel.util.toolbox.FlexibleAggregationStrategy"/>
       <bean id="_flexible2" factory-bean="_flexible0" factory-method="accumulateInCollection">
           <constructor-arg value="java.util.ArrayList" />
       </bean>
      
      <camelContext xmlns="http://camel.apache.org/schema/spring">
             <route>
                 <from uri="direct:start"/>
                 <log message="Receiving ${body}"/>
                 <aggregate strategyRef="_flexible2" completionTimeout="60000" >
                     <correlationExpression>
                         <xpath>/order/@customerID</xpath>
                     </correlationExpression>
                     <log message="Aggregate: releasing ${body}"/>
                     <split>
                         <simple>${body}</simple>
                         <choice>
                             <when>
                                 <simple>${header.CamelSplitIndex} == 0</simple>
                                 <log message="*** Processing: ${body}"/>
                                 <to uri="mock:result"/>
                             </when>
                             <otherwise>
                                 <log message="--- Delaying: ${body}"/>
                                 <to uri="seda:delay" />
                             </otherwise>
                         </choice>
                     </split>
                 </aggregate>
             </route>
      
             <route>
                 <from uri="seda:delay"/>
                 <to uri="direct:start"/>
             </route>
      </camelContext>
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2018-10-27
        • 2012-12-23
        • 2021-03-29
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多