【问题标题】:RabbitMQ messages are not being dequeued upon consumptionRabbitMQ 消息在消费时不会出队
【发布时间】:2019-12-08 20:29:37
【问题描述】:


我需要创建一个 RabbitMQ 消息传递系统,将消息发送给特定用户,其中一条消息总是针对一个用户。

每个用户都应该有自己的动态创建的消息队列和 DLQ。 当用户拒绝一条消息时,它应该被移动到他的 DLQ,在那里它将等待 10 秒,然后返回到常规队列。
每个部分都在工作,除了消息在接受或拒绝时不会从常规队列中删除。

配置:

spring.rabbitmq.listener.type=simple
spring.rabbitmq.listener.simple.acknowledge-mode=auto
spring.rabbitmq.listener.simple.default-requeue-rejected=true
spring.rabbitmq.listener.simple.retry.enabled=false

消息服务:

@Service
@Scope(proxyMode = ScopedProxyMode.TARGET_CLASS)
public class RabbitService {

    public static final String EXCHANGE_NAME = "my-exchange";
    public static final String QUEUE_NAME_PREFIX = "my-queue.";

    private final RabbitTemplate rabbitTemplate;
    private final AmqpAdmin amqpAdmin;

    @Autowired
    public RabbitService(
            RabbitTemplate rabbitTemplate,
            AmqpAdmin amqpAdmin
    ) {
        this.rabbitTemplate = rabbitTemplate;
        this.amqpAdmin = amqpAdmin;
    }

    /**
     * Initializes exchange with name {@link EXCHANGE_NAME} if it does not exist.
     */
    @PostConstruct
    public void init() {
        Exchange exchange = ExchangeBuilder.topicExchange(EXCHANGE_NAME).build();
        amqpAdmin.declareExchange(exchange);
    }

    /**
     * Sends the {@param message} to user with id {@param userId}.
     */
    public void send(@NotNull String message, long userId) {
        String queueName = QUEUE_NAME_PREFIX + userId;

        declareQueuesIfNecessary(queueName);

        rabbitTemplate.convertAndSend(EXCHANGE_NAME, queueName, message);
    }

    /**
     * Declares a new {@link Queue} with the specified {@param queueId} and a DLQ if they do not exist.
     *
     * @param queueId the queue identifier
     */
    private void declareQueuesIfNecessary(@NotNull String queueName) {
        String resendQueueName = "resend." + queueName;

        Map<String, Object> args;

        args = new HashMap<>();

        // Args for the resend queue. It should have a TTL of 10 seconds, after which it will be moved to the regular queue.
        args.put("x-message-ttl", 10000L);
        args.put("x-dead-letter-exchange", EXCHANGE_NAME);
        args.put("x-dead-letter-routing-key", queueName);
        amqpAdmin.declareQueue(new Queue(resendQueueName, true, false, true, args));
        amqpAdmin.declareBinding(new Binding(
                resendQueueName,
                Binding.DestinationType.QUEUE,
                EXCHANGE_NAME,
                resendQueueName,
                null
        ));

        // Args for the regular queue. When messages are rejected, they should move to the resend queue.
        args = new HashMap<>();
        args.put("x-dead-letter-exchange", EXCHANGE_NAME);
        args.put("x-dead-letter-routing-key", resendQueueName);
        amqpAdmin.declareQueue(new Queue(queueName, true, false, true, args));
        amqpAdmin.declareBinding(new Binding(
                queueName,
                Binding.DestinationType.QUEUE,
                EXCHANGE_NAME,
                queueName,
                null
        ));
    }
}

消费者:

@Component
public class SimpleRabbitConsumer {

    /**
     * Consumes messages for all users and prints them if they are not blank, otherwise throws an exception.
     *@implNote - in this example we don't need the user id
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue,
            exchange = @Exchange(value = RabbitService.EXCHANGE_NAME, type = "topic"),
            key = RabbitService.QUEUE_NAME_PREFIX + "*"
    ))
    public void consume(@NotNull String message) {
        // Let's reject the message it it's blank
        if (StringUtils.isBlank(message)) {
            throw new RuntimeException("Message rejected");
        }

        System.out.println("Message '" + message + "' received.");
    }
}

什么不起作用
当消费者抛出RuntimeException时,消息不会被拒绝,而是会留在常规队列中。
我正在使用 RabbitMQ 管理器插件来查看有关队列的信息。
即使正确使用,消息也会保留在那里。

什么有效
RabbitService#send(...) 方法被调用时,它正确地为指定的用户创建了常规队列和死信队列,但他还没有这些队列。
然后,SimpleRabbitConsumer#consume(...) 会正确接收该消息,然后将其打印出来或引发异常。
如果使用 RabbitMQ 管理器确认消息,则它会正确地从队列中删除。
当消息被拒绝(没有重新排队)时,它会被正确地发送到 DLQ,它会在 DLQ 中停留 10 秒,然后再被移回。

我的尝试
我已将 RuntimeException 更改为 AmqpRejectAndDontRequeueException,但对于 spring.rabbitmq.listener.simple.acknowledge-mode automanual,它的行为仍然相同。
在使用手动确认时,我还尝试将消费者方法更改为:

    public void consume(
            @NotNull String message,
            @NotNull Channel channel,
            @Header(AmqpHeaders.DELIVERY_TAG) long tag
    ) throws IOException {
        // Let's reject the message it it's blank
        if (StringUtils.isBlank(message)) {
            channel.basicReject(tag, false);
        } else {
            System.out.println("Message '" + message + "' received.");
            channel.basicAck(tag, false);
        }
    }

无济于事。

其他问题
当消费者关闭时队列中有未处理的项目,那么当消费者重新启动时,消息将不会发送给消费者。


我知道解释很长,但是我花了很长时间浏览互联网寻找解决方案没有找到任何解决方案,所以我来到了这里。
我非常欢迎任何有关如何使其发挥作用的建议。

谢谢。


编辑
在调试时,我发现当应用程序启动时,会自动创建一个队列 Q 用于绑定“my-queue.*”的交换。
还创建了一个通道,它连接到消费者和队列 Q,而用户队列没有消费者。 我发送的任何消息似乎都发送到队列 Q 和用户队列,但它只从队列 Q 中删除。

日志如下:

2019-07-31 13:51:54.411  WARN 130172 --- [cTaskExecutor-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener method 'public void com.myproject.SimpleRabbitConsumer.consume(java.lang.String)' threw exception
...
Caused by: java.lang.RuntimeException: Message rejected

2019-07-31 13:51:54.456 DEBUG 130172 --- [cTaskExecutor-1] o.s.a.r.listener.BlockingQueueConsumer   : Rejecting messages (requeue=false)
2019-07-31 13:51:54.456 TRACE 130172 --- [cTaskExecutor-1] o.s.a.r.c.CachingConnectionFactory       : AMQChannel(amqp://guest@127.0.0.1:5672/,1) channel.basicNack([1, true, false])
2019-07-31 13:51:54.457 TRACE 130172 --- [cTaskExecutor-1] o.s.a.r.l.SimpleMessageListenerContainer : Waiting for message from consumer.
2019-07-31 13:51:54.457 TRACE 130172 --- [cTaskExecutor-1] o.s.a.r.listener.BlockingQueueConsumer   : Retrieving delivery for Consumer@7f5ea4e8: tags=.......
2019-07-31 13:51:55.460 TRACE 130172 --- [cTaskExecutor-1] o.s.a.r.l.SimpleMessageListenerContainer : Waiting for message from consumer.
2019-07-31 13:51:55.460 TRACE 130172 --- [cTaskExecutor-1] o.s.a.r.listener.BlockingQueueConsumer   : Retrieving delivery for Consumer@7f5ea4e8: tags=.......

可能是当我向 id 为 1 的用户发送消息时,消息通过主题发送到绑定“my-queue.*”和“my-queue.1”,但消费者只处理“my-queue.*”而不是“my-queue.1”?

【问题讨论】:

  • @ArnaudClaudel 你提到的问题是关于消息没有被添加回队列,但我的问题是它们没有被删除。不过,我尝试使用指定的更改。
    当使用acknowledgemode=AUTOdefaultRequeueRejected=false 时,无论消费者是否抛出异常,都不会发生任何变化。 IE。消息仍然停留在队列中。 AmqpRejectAndDontRequeueException 也是如此。我将在日志中添加单独的评论。

标签: java spring-boot rabbitmq


【解决方案1】:

当消费者抛出 RuntimeException 时,消息不会从队列中删除并停留在异常循环中,我认为这是您面临的错误,我之前遇到了同样的问题,只是异常处理 解决了我的问题,你试过了吗?

【讨论】:

  • 消息没有卡在异常循环中,而是永远卡在队列中。在那里,它不会再次被消耗,这意味着没有其他例外。
  • 好的,您尝试过异常处理吗?试试这个 :: stackoverflow.com/questions/36209605/…
  • 异常处理没有帮助,因为异常表示消息被拒绝。如果我要处理它,那么消息将被确认,这不是我想要的。即使我尝试过,它仍然没有将它从队列中删除。我已经从您发送的链接中尝试了一些东西,我已经在this comment 中回复了它。此问题的描述已使用来自 rabbit 的调试日志进行了更新,它可能很有用。
  • @Warozell 这个问题的解决方案或结果是什么? ,它肯定会帮助我应对类似的挑战
  • @Riidonesh 这还没有解决,我们已经实现了自己的排队系统。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2019-09-22
  • 2016-05-04
  • 2019-05-20
  • 2016-12-15
  • 1970-01-01
  • 2018-08-04
  • 1970-01-01
相关资源
最近更新 更多