【问题标题】:How do I handle RabbitMQ Consumer Cancellation Notification when using Spring ChannelAwareMessageListener使用 Spring ChannelAwareMessageListener 时如何处理 RabbitMQ 消费者取消通知
【发布时间】:2013-06-09 02:20:24
【问题描述】:

RabbitMQ 新手和 Java 新手。

我正在尝试编写一个侦听器,它将使用手动确认并使用 java Spring AMQP 抽象处理消费者取消通知。我可以使用 Spring 抽象来完成这两个任务吗?

我想编写一个侦听器,它将从队列中提取消息并处理该消息(可能写入数据库或其他东西)。我计划使用手动确认,以便如果消息处理失败或由于某种原因无法完成,我可以拒绝并重新排队。到目前为止,我想我发现为了使用 Spring AMQP 手动确认/拒绝/拒绝,我必须使用 ChannelAwareMessageListener

我意识到我应该处理来自 RabbitMQ 的消费者取消通知,但是使用 ChannelAwareMessageListener 我真的看不到为此编写代码的方法。我认为处理 CCN 的唯一方法是使用较低级别的 java 客户端 api 编写代码,方法是调用 channel.basicConsume() 并传递一个新的 DefaultConsumer 实例,该实例允许您处理消息传递和取消。

我也不知道如何在ConnectionFactory 上设置clientProperties(告诉代理我可以处理CCN),因为我是从配置中的bean 获取工厂的。

我的监听器和容器创建的伪代码如下。

public class MyChannelAwareListener implements ChannelAwareMessageListener
{
    @Override
    public void onMessage(Message message, Channel channel) throws Exception
    {
        msgProcessed = processMessage(message);

        if(msgProcessed)    
           channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        else
           channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);  
    }
}

public static void main(String[] args) throws Exception
{
    ConnectionFactory rabbitConnectionFactory;
    ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext   (MY_CONTEXT_PATH);
    rabbitConnectionFactory = (ConnectionFactory)ctx.getBean("rabbitConnectionFactory");

    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();

    MyChannelAwareListener listener = new MyChannelAwareListener();
    container.setMessageListener(listener);
    container.setQueueNames("myQueue");
    container.setConnectionFactory(rabbitConnectionFactory);
    container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    container.start();
}

【问题讨论】:

    标签: java rabbitmq amqp spring-amqp


    【解决方案1】:

    要设置客户端属性,您需要使用ConnectionFactory 中的setClientProperties 方法(假设此ConnectionFactory 是来自RabbitMQ Java 库的对象)。此方法需要一个Map<String, Object>,其中包含客户端的属性和功能。以下几行是 RabbitMQ Java 库中的默认值:

    Map<String,Object> props = new HashMap<String, Object>();
    props.put("product", LongStringHelper.asLongString("RabbitMQ"));
    props.put("version", LongStringHelper.asLongString(ClientVersion.VERSION));
    props.put("platform", LongStringHelper.asLongString("Java"));
    props.put("copyright", LongStringHelper.asLongString(Copyright.COPYRIGHT));
    props.put("information", LongStringHelper.asLongString(Copyright.LICENSE));
    
    Map<String, Object> capabilities = new HashMap<String, Object>();
    capabilities.put("publisher_confirms", true);
    capabilities.put("exchange_exchange_bindings", true);
    capabilities.put("basic.nack", true);
    capabilities.put("consumer_cancel_notify", true);
    
    props.put("capabilities", capabilities);
    

    对于管理 ACK 和消费者取消,我不确定如何使用 Spring AMQP 抽象来做到这一点,但是使用 channel.basicConsume 完全可以做到这一点,这使您可以通过所有回调方法处理所有场景:

    http://www.rabbitmq.com/releases/rabbitmq-java-client/v3.1.5/rabbitmq-java-client-javadoc-3.1.5/

    希望这会有所帮助!

    【讨论】:

      猜你喜欢
      • 2012-06-28
      • 2016-05-04
      • 1970-01-01
      • 2015-05-27
      • 2014-07-23
      • 2020-04-21
      • 2015-10-03
      • 2017-07-31
      • 1970-01-01
      相关资源
      最近更新 更多