【问题标题】:How to move messages from one queue to another in RabbitMQ如何在 RabbitMQ 中将消息从一个队列移动到另一个队列
【发布时间】:2018-07-28 11:32:29
【问题描述】:

在 RabbitMQ 中,我有一个失败队列,其中包含来自不同队列的所有失败消息。现在我想提供“重试”的功能,以便管理员可以再次将失败的消息移动到各自的队列中。这个想法是这样的:

上图是我的失败队列的结构。单击重试链接后,消息应移入原始队列,即 queue1、queue2 等。

【问题讨论】:

  • 我不知道你在这里找什么。您必须编写代码来执行此操作,并且必须在某个地方发布消息。
  • 我不知道你真的想尝试被零除多少次,但你永远不会得到不同的答案????
  • 嗨,我只想将消息从一个队列移动到另一个队列。我只是想检查一下,rabbitMQ 中是否可以将任何消息从一个队列移动到另一个队列。我知道铲子插件会有所帮助,但它将整个队列移动到另一个队列中。我需要一个一个地随机移动它们。如果可能的话,我正在寻找 Java 实现。
  • 我认为您需要熟悉 DLX:rabbitmq.com/dlx.html
  • 不知道为什么这个问题是-1

标签: rabbitmq message-queue spring-amqp messagebroker rabbitmq-exchange


【解决方案1】:

如果您正在寻找一个 Java 代码来执行此操作,那么您必须简单地使用您想要移动的消息并将这些消息发布到所需的队列。对基本的消费和发布操作不熟悉的可以去rabbitmq的Tutorials页面看看。

【讨论】:

  • 是否可以在不删除消息 #1 的情况下从我的失败队列中移动消息 #2?我还没有找到任何方法来做到这一点。
  • RabbitMQ 不是这样设计的,如果你知道队列应该如何工作,你就会明白这一点。生产者将消息生成到队列中,消费者从该队列中消费消息,一切都以先进先出的方式进行。所以,你说的是RabbitMQ应该通过不同的数据结构来实现,这不是目前的情况。
  • 谢谢阿潘。和我想的一样。
【解决方案2】:

这不是直接的消费和发布。 RabbitMQ 不是这样设计的。它考虑到交换和队列都可以是临时的并且可以被删除。这是嵌入在通道中以在单次发布后关闭连接。

假设: - 你有一个持久的队列和目的地交换(发送到) - 你有一个持久的目标队列(取自)

这是执行此操作的代码:

        import com.rabbitmq.client.Channel;
        import com.rabbitmq.client.QueueingConsumer;
        import org.apache.commons.lang.StringUtils;
        import org.slf4j.Logger;
        import org.slf4j.LoggerFactory;
        import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;    

        public object shovelMessage(
                     String exchange,
                     String targetQueue,
                     String destinationQueue,
                     String host,
                     Integer port,
                     String user,
                     String pass,
                     int count) throws IOException, TimeoutException, InterruptedException {

                if(StringUtils.isEmpty(exchange) || StringUtils.isEmpty(targetQueue) || StringUtils.isEmpty(destinationQueue)) {
                    return null;
                }

                CachingConnectionFactory factory = new CachingConnectionFactory();
                factory.setHost(StringUtils.isEmpty(host)?internalHost.split(":")[0]:host);
                factory.setPort(port>0 ? port: Integer.parseInt(internalPort.split(":")[1]));
                factory.setUsername(StringUtils.isEmpty(user)? this.user: user);
                factory.setPassword(StringUtils.isEmpty(pass)? this.pass: pass);
                Channel tgtChannel = null;
                try {
                    org.springframework.amqp.rabbit.connection.Connection connection = factory.createConnection();

                    tgtChannel = connection.createChannel(false);
                    tgtChannel.queueDeclarePassive(targetQueue);

                    QueueingConsumer consumer = new QueueingConsumer(tgtChannel);
                    tgtChannel.basicQos(1);
                    tgtChannel.basicConsume(targetQueue, false, consumer);

                    for (int i = 0; i < count; i++) {
                        QueueingConsumer.Delivery msg = consumer.nextDelivery(500);
                        if(msg == null) {
        // if no message found, break from the loop.
                            break;
                        }
                        //Send it to destination Queue
                        // This repetition is required as channel looses the connection with 
                        //queue after single publish and start throwing queue or exchange not 
                        //found connection.
                        Channel destChannel = connection.createChannel(false);
                        try {
                            destChannel.queueDeclarePassive(destinationQueue);
    SerializerMessageConverter serializerMessageConverter = new SerializerMessageConverter();
     Message message = new Message(msg.getBody(), new MessageProperties());
                          Object o = serializerMessageConverter.fromMessage(message);
// for some reason msg.getBody() writes byte array which is read as a byte array // on the consumer end due to which this double conversion.
                            destChannel.basicPublish(exchange, destinationQueue, null, serializerMessageConverter.toMessage(o, new MessageProperties()).getBody());
                            tgtChannel.basicAck(msg.getEnvelope().getDeliveryTag(), false);
                        } catch (Exception ex) {
                            // Send Nack if not able to publish so that retry is attempted
                            tgtChannel.basicNack(msg.getEnvelope().getDeliveryTag(), true, true);
                            log.error("Exception while producing message ", ex);
                        } finally {
                            try {
                                destChannel.close();
                            } catch (Exception e) {
                                log.error("Exception while closing destination channel ", e);
                            }

                        }
                    }

                } catch (Exception ex) {
                    log.error("Exception while creating consumer ", ex);
                } finally {
                    try {
                        tgtChannel.close();
                    } catch (Exception e) {
                        log.error("Exception while closing destination channel ", e);
                    }
                }

                return null;

            }

【讨论】:

    【解决方案3】:

    要重新排队消息,您可以使用receiveAndReply method。以下代码会将所有消息从dlq-queue 移动到queue-queue:

    do {
        val movedToQueue = rabbitTemplate.receiveAndReply<String, String>(dlq, { it }, "", queue)
    } while (movedToQueue)
    

    在上面的代码示例中,dlq 是源队列,{ it } 是标识函数(您可以在此处转换消息),"" 是默认交换器,queue 是目标队列。

    【讨论】:

      【解决方案4】:

      我也实现了类似的东西,所以我可以将消息从 dlq 移回处理。链接:https://github.com/kestraa/rabbit-move-messages

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2020-07-12
        • 1970-01-01
        • 2015-05-22
        • 2011-03-15
        相关资源
        最近更新 更多