【问题标题】:Avoiding duplicated messages on JMS/ActiveMQ避免 JMS/ActiveMQ 上的重复消息
【发布时间】:2011-06-23 11:49:36
【问题描述】:

有没有办法抑制 ActiveMQ 服务器上定义的队列上的重复消息?

我尝试手动定义 JMSMessageID,(message.setJMSMessageID("uniqueid")),但服务器忽略了此修改并使用内置生成的 JMSMessageID 传递消息。

按照规范,我没有找到有关如何删除重复消息的参考。

在 HornetQ 中,为了解决这个问题,我们需要在消息定义中声明 HQ 特定的属性 org.hornetq.core.message.impl.HDR_DUPLICATE_DETECTION_ID。

即:

Message jmsMessage = session.createMessage();
String myUniqueID = "This is my unique id"; // Could use a UUID for this
message.setStringProperty(HDR_DUPLICATE_DETECTION_ID.toString(), myUniqueID);

有人知道 ActiveMQ 是否有类似的解决方案?

【问题讨论】:

    标签: java duplicates jms activemq mom


    【解决方案1】:

    你应该看看 Apache Camel,它提供了一个可以与任何 JMS 提供者一起工作的幂等消费者组件,请参阅:http://camel.apache.org/idempotent-consumer.html

    将它与 ActiveMQ 组件结合使用使得 JMS 的使用变得非常简单,请参阅: http://camel.apache.org/activemq.html

    【讨论】:

    • 我怀疑这种方法是否能解决我的问题。在此实例在队列中期间,我只需要保留一个具有相同 JMSMessageID 的消息实例。我需要它作为一个集合工作。在从队列中删除最新的同上元素后,我希望能够使用相同的 JMSMessageID 放置其他消息。我需要实现它并进行测试。但是,基于 EAI 书中描述的 Idempotent,我认为这个概念与我的必要性不符。但是,建议的解决方案很好。我将对此进行更多研究并在这里评论我的结果。谢谢
    【解决方案2】:

    我怀疑 ActiveMQ 是否原生支持它,但实现幂等消费者应该很容易。一种方法是在生产者端为每条消息添加一个唯一标识符,现在在消费者端使用存储(数据库、缓存等),可以检查该消息之前是否已收到,并且根据该检查继续处理。

    我看到了之前的 * 问题 - Apache ActiveMQ 5.3 - How to configure a queue to reject duplicate messages? ,这也可能有所帮助。

    【讨论】:

    • 由于消费者本身可以是多线程的,因此要识别其是否重复,必须实现分布式/内存锁定。对吗?
    【解决方案3】:

    现在支持删除嵌入到 ActiveMQ 传输中的重复消息。查看Connection Configuration Guide中的配置值auditDepthauditMaximumProducerNumber

    【讨论】:

    • 您如何实际配置这些参数以避免重复?
    • @Thomas 我不确定你在问什么。一般来说,如何在 ActiveMQ 中应用配置?或者为这些特定字段使用什么值?
    • 只是从参数的描述来看,我听的不是很清楚。 auditDepth 例如,那里的值是指消息的 Nb 还是将被屏蔽以进行重复的字节的 nb?关于auditMaximumProducerNumber,这是否意味着将筛选的生产者数量有限?顺便说一句,如果一条内容相同的消息由 2 个不同的订阅者发布,那么这条消息是否会被认为是重复的?
    • @Chris IIUC 这些参数保证检测到多达 64 个生产者的 2048 条消息的重复项。但是 ActiveMQ 如何确定什么是重复的呢?如果它是由 JMSMessageID 设置的,那么我们将回到第一方,因为我们无法设置它。
    【解决方案4】:

    有一种方法可以让 ActiveMQ 根据 JMS 属性过滤重复项。它涉及编写一个 Activemq Plugin。将重复消息发送到死信队列的基本代理过滤器将是这样的

    import java.util.List;
    import java.util.regex.Matcher;
    import java.util.regex.Pattern;
    
    import org.apache.activemq.broker.Broker;
    import org.apache.activemq.command.Message;
    import org.apache.activemq.command.ActiveMQMessage;
    import org.apache.activemq.broker.BrokerFilter;
    import org.apache.activemq.broker.ConnectionContext;
    import org.apache.activemq.command.ConnectionInfo;
    import org.apache.activemq.broker.ProducerBrokerExchange;
    
    public class DuplicateFilterBroker extends BrokerFilter {
        String messagePropertyName;
        boolean switchValue;
    
        public DuplicateFilterBroker(Broker next, String messagePropertyName) {
            super(next);
            this.messagePropertyName = messagePropertyName;
        }
    
        public boolean hasDuplicate(String propertyValue){
            switchValue = propertyValue;
            return switchValue;
        }
    
        public void send(ProducerBrokerExchange producerExchange, Message msg) throws Exception { 
            ActiveMQMessage amqmsg = (ActiveMQMessage)msg; 
            Object msgObj = msg.getMessage(); 
            if (msgObj instanceof javax.jms.Message) { 
                javax.jms.Message jmsMsg = (javax.jms.Message) msgObj; 
                if (!hasDuplicate(jmsMsg.getStringProperty(messagePropertyName))) {
                    super.send(producerExchange, msg);
                }
                else {
                   sendToDeadLetterQueue(producerExchange.getConnectionContext(), msg);
                } 
            }
        }  
    }
    

    【讨论】:

    • 这个插件如何决定使用哪个属性来进一步过滤重复消息,关于用例的解释对于集成这样的插件非常有帮助。提前感谢您的回答。
    【解决方案5】:

    似乎问题中建议的方式也适用于 ActiveMQ(2016/12)。请参阅activemq-artemis 指南。这需要生产者在消息中设置特定属性。

    Message jmsMessage = session.createMessage();
    String myUniqueID = "This is my unique id";   // Could use a UUID for this
    message.setStringProperty(HDR_DUPLICATE_DETECTION_ID.toString(), myUniqueID);
    

    但是包含该属性的类是不同的: org.apache.activemq.artemis.core.message.impl.HDR_DUPLICATE_DETECTION_ID,属性值为_AMQ_DUPL_ID

    【讨论】: