【问题标题】:How to delay a JMS message in queue before a listener receives it using camel?如何在侦听器使用骆驼接收JMS消息之前延迟队列中的JMS消息?
【发布时间】:2016-02-06 06:43:21
【问题描述】:

我正在尝试将消息延迟几秒钟以使其排队。 但是当我使用骆驼延迟选项时,它不是在队列中延迟,而是立即被消耗,并在路由路径中延迟。 我们如何才能延迟消息,以便它们在队列中等待几秒钟?

我的带有骆驼配置的弹簧如下所示。

<bean id="jms" class="org.apache.camel.component.jms.JmsComponent">
    <property name="connectionFactory" ref="jmsConnectionFactory" />
</bean>

<camelContext id="camelContext" xmlns="http://camel.apache.org/schema/spring">  

    <route id="routeOne" delayer="10000">               
        <from uri="jms://queueone?concurrentConsumers=1"/>          
        <log message="routeOne incoming message ${body}"/>              
        <delay><constant>30000</constant></delay>                       
        <process ref="loggerProcessor"/>                        
    </route>

</camelContext> 

<bean id="loggerProcessor" name="loggerProcessor" class="emh.LoggerProcessor"/>

【问题讨论】:

  • 一些 JMS 实现支持延迟消息传递。你用的是哪一个?

标签: jms apache-camel


【解决方案1】:

Camel 内置了对 Throttler 模式的支持,有一个节流器组件。参考:http://camel.apache.org/throttler.html

只需将以下内容添加到路由中,这应该会延迟消息。

<throttle timePeriodMillis="30000">

【讨论】:

  • throttle 正在从 Queue 中获取消息并将其保留在路由交换中并等待延迟。如果我们在延迟之前关闭应用程序,那么我们将丢失消息
【解决方案2】:

骆驼延迟

Camel delaythrottle 将从 ActiveMQ 队列中移除(使用)消息,并将其(在内存中)保留在路由中直到处理。 Transacted JMS 可能会缓解有关丢失消息的问题,但尚未尝试过。 Food for thought


通过 ActiveMQ 延迟

我让它在 Spring Boot 2 中与 Camel 和 ActiveMQ 一起使用 AMQ_SCHEDULED_DELAY 标头并启用 schedulerSupport [1, 2, 3, 4, 5, @ 987654327@]。这是每个JMS 2.0 Delivery Delay

请注意,只有在没有活动消费者时,消息才会出现在 ActiveMQ 队列中(待处理) - 即应用程序关闭或 Camel 路由禁用。以下配置用于使用现有 ActiveMQ 代理。

application.properties (source)

spring.activemq.broker-url=tcp://localhost:61616

RouteBuilder

from("jms://incomingQueue").setHeader("AMQ_SCHEDULED_DELAY", constant("1000")).inOnly("jms://delayedQueue");  

from("jms://delayedQueue").process(...);

activemq.xml(现有代理安装)

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true">  

对于在/usr/local/Cellar/activemq/&lt;version&gt;/libexec/conf 下找到的 OSX Homebrew。


如果您想在您的应用程序中使用嵌入的代理,您可以使用BrokerService

默认情况下,数据保存在 activemq-data 中,并且需要 activemq-kahadb-store 依赖项 - 如果您选择 JDBC (source) 也是如此。使用brokerService.setPersistent( false ) 不再需要存储依赖项,但随后JMS 消息将再次存储在内存中。

@Bean
public BrokerService brokerService()
{
    BrokerService brokerService = new BrokerService();
    brokerService.setSchedulerSupport( true );
    return brokerService;
}

更多示例请见herehere

【讨论】:

  • 请注意,Camel 2.18.1 似乎是与 Spring Boot 2 一起使用的最后一个版本,计划为 Camel 2.22.0 (link) 提供官方支持。
猜你喜欢
  • 1970-01-01
  • 2011-10-09
  • 2017-03-15
  • 2012-01-19
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多