【问题标题】:Camel JMS Component and Parallel ProcessingCamel JMS 组件和并行处理
【发布时间】:2012-10-14 04:47:47
【问题描述】:

我有一个队列,其中有多条消息,都是独立的,可以并行处理。

我见过一个多播路由器,它接收相同的消息并将其拆分到多个接收器,我想我还尝试了其他一些方法,例如节流飞行路由策略、线程池配置文件等。

但是,我确实想要一个路由来封装多个并行 JMS 会话,我希望配置和一次性处理所有消息。

我想陈述我的一个假设,即带有 from 的单个路由意味着单个会话而不是 n 个并行会话。如果我错了,请纠正我。

我的骆驼上下文看起来有点像这样:

<bean id="throttlePolicy" class="org.apache.camel.impl.ThrottlingInflightRoutePolicy">
    <!-- define the scope to be context scoped so we measure against total 
        inflight exchanges that means for both route1, route2 and route3 all together -->
    <property name="scope" value="Context" />
    <!-- when we hit > 20 inflight exchanges then kick in and suspend the routes -->
    <property name="maxInflightExchanges" value="20" />
    <!-- when we hit lower than 10% of the max = 2 then kick in and resume 
        the routes the default percentage is 70% but in this demo we want a low value -->
    <property name="resumePercentOfMax" value="10" />
    <!-- output throttling activity at WARN level -->
    <property name="loggingLevel" value="WARN" />
</bean>

<camelContext id="camel" errorHandlerRef="dlc"
    xmlns="http://camel.apache.org/schema/spring">

    <!-- this is the Dead Letter Channel error handler, where we send failed 
        message to a log endpoint -->
    <errorHandler id="dlc" type="DeadLetterChannel"
        deadLetterUri="jms:deadLetterQueue">
        <redeliveryPolicy retryAttemptedLogLevel="INFO"
            maximumRedeliveries="3" redeliveryDelay="250" backOffMultiplier="2"
            useExponentialBackOff="true" />
    </errorHandler>

    <route routePolicyRef="throttlePolicy">
        <from uri="jms:camel.design.md5InputQueue" />
        <transacted ref="required" />
        <process ref="basicProcessor" />
        <to uri="jms:camel.integrationTest.reply" />
    </route>
</camelContext>

如您所见,我正在做的是计算源的 MD5。我希望这样做并将结果传输到回复队列中,并让所有这些都并行完成。

为了模拟这一点,我在基本处理器中设置了一个睡眠(一秒),我看到的是一个又一个消息的顺序处理,而不是并行处理。例如,如果有 10 条消息,则需要 10 秒,如果有 20 条消息,则需要 20 秒等。

如何让它并行工作并进行所有 MD5 计算,比如在处理器中设置睡眠条件后,大约 2 秒内完成 10 条输入消息。

【问题讨论】:

    标签: parallel-processing jms activemq apache-camel


    【解决方案1】:

    只需设置maxConcurrentConsumers 属性即可启用队列中的多个消费者线程...

    <route routePolicyRef="throttlePolicy">
        <from uri="jms:camel.design.md5InputQueue?maxConcurrentConsumers=5" />
        <transacted ref="required" />
        <process ref="basicProcessor" />
        <to uri="jms:camel.integrationTest.reply" />
    </route>
    

    【讨论】:

    • 感谢 Boday,这确实奏效了,一个小小的调整也让它变得更好,或者说是我真正想要的。我设置了 concurrentConsumers ,这就是我想要的。
    猜你喜欢
    • 1970-01-01
    • 2010-10-12
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-01-02
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多