【问题标题】:Persist message in topic after server restart服务器重新启动后在主题中保留消息
【发布时间】:2016-01-13 18:48:40
【问题描述】:

我正在学习 Spring Integration JMS。我遇到了一个问题,即我的主题没有保留尚未被客户端使用的待处理消息。

基本上我启动 ActiveMQ 然后使用 REST 客户端我调用生产者发送消息 50 次,以便 50 条消息在主题中排队。在消费者端,我应用了 5 秒的睡眠计时器,以便每条消息以 5 秒的固定间隔被消耗。然后在这之间我停止了 ActiveMQ。同时,客户端消费了一些消息,假设 50 条消息中有 15 条已被消费。然后,如果我重新启动 ActiveMQ,我希望主题会保留 35 条待处理的消息,但我在管理控制台中的主题选项卡下看不到。

这是我的配置文件:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
       xmlns:oxm="http://www.springframework.org/schema/oxm"
       xmlns:int-jme="http://www.springframework.org/schema/integration"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
                http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd
                http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
                http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
                http://www.springframework.org/schema/oxm http://www.springframework.org/schema/oxm/spring-oxm-3.0.xsd">


    <!-- Component scan to find all Spring components -->
    <context:component-scan base-package="com.geekcap.springintegrationexample" />

    <bean class="org.springframework.web.servlet.mvc.annotation.AnnotationMethodHandlerAdapter">
        <property name="order" value="1" />
        <property name="messageConverters">
            <list>
                <!-- Default converters -->
                <bean class="org.springframework.http.converter.StringHttpMessageConverter"/>
                <bean class="org.springframework.http.converter.FormHttpMessageConverter"/>
                <bean class="org.springframework.http.converter.ByteArrayHttpMessageConverter" />
                <bean class="org.springframework.http.converter.xml.SourceHttpMessageConverter"/>
                <bean class="org.springframework.http.converter.BufferedImageHttpMessageConverter"/>
                <bean class="org.springframework.http.converter.json.MappingJackson2HttpMessageConverter" />
            </list>
        </property>
    </bean>

    <!-- Define a channel to communicate out to a JMS Destination -->
    <int:channel id="topicChannel"/>

    <!-- Define the ActiveMQ connection factory -->
    <bean id="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616"/>
    </bean>

    <!--
        Define an adaptor that route topicChannel messages to the myTopic topic; the outbound-channel-adapter
        automagically fines the configured connectionFactory bean (by naming convention
      -->
    <int-jms:outbound-channel-adapter channel="topicChannel"
                                      destination-name="topic.myTopic"
                                      pub-sub-domain="true" />

    <!-- Create a channel for a listener that will consume messages-->
    <int:channel id="listenerChannel" />

    <int-jms:message-driven-channel-adapter id="messageDrivenAdapter"
                                            channel="getPayloadChannel"
                                            destination-name="topic.myTopic"
                                            pub-sub-domain="true" />

    <int:service-activator input-channel="listenerChannel" ref="messageListenerImpl" method="processMessage" />

    <int:channel id="getPayloadChannel" />

    <int:service-activator input-channel="getPayloadChannel" output-channel="listenerChannel" ref="retrievePayloadServiceImpl" method="getPayload" />

</beans>

我还读到默认模式是持久的。但就我而言,它似乎不起作用。

编辑:

根据Gary Russel在添加属性后给出的答案

  • subscription-durable="true"
  • durable-subscription-name="mySubscription"

&lt;int-jms:message-driven-channel-adapter&gt; 我遇到了与 XML 相关的问题

  • cvc-complex-type.3.2.2:属性“subscription-durable”不允许出现在元素“int-jms:message-driven-channel-adapter”中。

  • cvc-complex-type.3.2.2:属性“durable-subscription-name”不允许出现在元素“int-jms:message-driven-channel-adapter”中。

请帮忙

【问题讨论】:

  • 到目前为止你做了什么来尝试解决你的问题?
  • 我在activemq.xml的标签中添加了persistent=true
  • 进一步我尝试在从频道发送消息时像这样设置MessageBuilder .withPayload(ticket) .setHeader(AmqpHeaders.DELIVERY_MODE, MessageDeliveryMode.PERSISTENT) .build();

标签: java jms activemq spring-integration


【解决方案1】:

这就是主题的工作方式,默认情况下,请阅读 JMS 规范。

主题是发布/订阅;只有在场的订阅者才能收到消息。

如果发布5,启动消费者,再发布5;他只会得到第二个 5。

如果你在经纪人得到全部 5 之前杀死他;在重新启动期间,代理发现没有消费者,因此他清除了消息。

您可以通过使用持久订阅来更改此行为,在这种情况下,即使当前未连接,代理也会确实为每个此类订阅保留消息。

要使用 Spring Integration 进行配置,请在消息驱动的通道适配器上设置 subscription-durable 并为其指定唯一的 subscription-name

【讨论】:

  • 感谢您的回答。它确实有帮助。但一个问题是我是否需要除您提到的配置之外的任何其他配置,或者只需在具有唯一 subscription-name 的消息驱动通道适配器上添加 subscription-durable 即可。
  • 你的意思是&lt;int-jms:message-driven-channel-adapter id="messageDrivenAdapter" channel="getPayloadChannel" destination="destination" subscription-durable="testJmsSubscription" pub-sub-domain="true" /&gt; 对吧?
  • 不要将代码/配置放在 cmets 中;它呈现不佳 - 改为编辑您的问题。请参阅属性说明 subscription-durable="true" subscription-name="foo" 应该是你所需要的。
  • 嗨,加里,感谢您的回复。我检查了属性并添加到我的配置中,但现在我的 xml 给了我错误。我只是在编辑那里提到的错误问题。
  • 请检查上面有问题的编辑。
【解决方案2】:

Activemq 中的主题不是持久的和持久的,所以万一你的一个消费者宕机了。你会丢失你的消息。

要使主题持久且持久,您可以通过为每个消费者创建唯一的客户端 ID 来创建持久消费者。

但同样,如果您遵循微服务架构,这不是分布式的。因此,多个 pod 或副本在消费消息时会产生问题,因为持久消费者无法实现负载平衡。

为了缓解这种情况,Activemq 中有一个虚拟主题选项。下面提供了更多详细信息,

您可以在名为 VirtualTopic.MyTopic. 的主题中通过您的制作人发送您的消息。 ** 注意:对于默认的 activemq 配置,您必须遵循此命名约定。但是是的,还有一种方法可以覆盖此命名约定。

现在,要通过多个消费者(此处为 A 和 B)使用您的消息,您还必须为您的消费者端目的地设置命名约定,例如。 Consumer.A.VirtualTopic.MyTopic Consumer.B.VirtualTopic.MyTopic 这两个消费者将通过上面创建的主题接收消息,并且在相同的多个副本之间启用负载平衡消费者。

我希望这将帮助您解决有关 activemq 主题的问题。

【讨论】:

    猜你喜欢
    • 2016-01-12
    • 2018-05-25
    • 2012-05-07
    • 1970-01-01
    • 2012-12-29
    • 2011-04-21
    • 2018-04-21
    • 2013-03-16
    相关资源
    最近更新 更多