【问题标题】:Multiple consumers on one RabbitMQ queue一个 RabbitMQ 队列上有多个消费者
【发布时间】:2023-06-08 02:27:01
【问题描述】:

我正在关注 RabbitMQ 的这份指南:https://www.rabbitmq.com/tutorials/tutorial-two-java.html。我想用一个队列上的多个线程来模拟这个功能。

如果我在启动 Sender 之前启动我的 Receiver,它会按预期工作,如下所示:

[*] Rcvr1 Waiting for messages...
[*] Rcvr2 Waiting for messages...
[x] Rcvr1 Received 'Hello 0'
[x] Rcvr2 Received 'Hello 1'
[x] Rcvr1 Received 'Hello 2'
[x] Rcvr2 Received 'Hello 3'
[x] Rcvr1 Received 'Hello 4'
[x] Rcvr2 Received 'Hello 5'
[x] Rcvr1 Received 'Hello 6'
[x] Rcvr2 Received 'Hello 7'
[x] Rcvr1 Received 'Hello 8'
...

但是,首先启动我的接收器会导致只有一个线程接收消息(要启动的最后一个线程):

[*] Rcvr2 Waiting for messages...
[*] Rcvr1 Waiting for messages...
[x] Rcvr1 Received 'Hello 9'
[x] Rcvr1 Received 'Hello 10'
[x] Rcvr1 Received 'Hello 11'
[x] Rcvr1 Received 'Hello 12'
[x] Rcvr1 Received 'Hello 13'
[x] Rcvr1 Received 'Hello 14'
[x] Rcvr1 Received 'Hello 15'
...

有趣的是,如果我启动发送方,然后启动接收方,如上所述,然后再次启动发送方(而接收方正在处理第一批)。发送的第一批消息是串行处理的,而第二批消息是并行处理的,或者至少与其余线程一起处理。:

 [*] Rcvr1 Waiting for messages...
 [*] Rcvr2 Waiting for messages...
 [x] Rcvr1 Received '[Batch 1] Hello 0'
 [x] Rcvr1 Received '[Batch 1] Hello 1'
 [x] Rcvr1 Received '[Batch 1] Hello 2'
 [x] Rcvr1 Received '[Batch 1] Hello 3'
 [x] Rcvr1 Received '[Batch 1] Hello 4'
 [x] Rcvr1 Received '[Batch 1] Hello 5'
 [x] Rcvr1 Received '[Batch 1] Hello 6'
 [x] Rcvr1 Received '[Batch 1] Hello 7'
 [x] Rcvr1 Received '[Batch 1] Hello 8'
 [x] Rcvr2 Received '[Batch 2] Hello 1'
 [x] Rcvr1 Received '[Batch 1] Hello 9'
 [x] Rcvr2 Received '[Batch 2] Hello 3'
 [x] Rcvr1 Received '[Batch 1] Hello 10'
 [x] Rcvr2 Received '[Batch 2] Hello 5'
 [x] Rcvr1 Received '[Batch 1] Hello 11'
 [x] Rcvr2 Received '[Batch 2] Hello 7'
 [x] Rcvr1 Received '[Batch 1] Hello 12'
 [x] Rcvr2 Received '[Batch 2] Hello 9'
 [x] Rcvr1 Received '[Batch 1] Hello 13'
 [x] Rcvr2 Received '[Batch 2] Hello 11'

这显然可以使用 RabbitMQ,我不确定我做错了什么。我的简单代码如下:

发件人

public class Send {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            for(int x=0; x<100; x++) {
                String message = "Hello "+x;
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
                System.out.println(" [x] Sent '" + message + "'");
            }
        }
    }
}

接收者

package com.mawv.ingest.rabbitmq;

import com.rabbitmq.client.*;

import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;

public class Recv {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        ThreadPoolExecutor rcvrPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        Runnable rcvr1 = () -> {
            try {
                Connection connection = factory.newConnection();
                Channel channel = connection.createChannel();
                channel.queueDeclare(QUEUE_NAME, true, false, false, null);

                System.out.println(" [*] Rcvr1 Waiting for messages...");
                DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                    Envelope envelope = delivery.getEnvelope();
                    String message = new String(delivery.getBody(), "UTF-8");
                    System.out.println(" [x] Rcvr1 Received '" + message + "'");
                    long deliveryTag = envelope.getDeliveryTag();
                    channel.basicAck(deliveryTag, true);
                    try {
                        Thread.sleep(1000);
                    } catch (Exception ex) { }

                };
                channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {  });

            } catch(Exception ex){
                ex.printStackTrace();
            }
        };
        Runnable rcvr2 = () -> {
            try {
                Connection connection = factory.newConnection();
                Channel channel = connection.createChannel();
                channel.queueDeclare(QUEUE_NAME, true, false, false, null);

                System.out.println(" [*] Rcvr2 Waiting for messages...");
                DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                    Envelope envelope = delivery.getEnvelope();
                    String message = new String(delivery.getBody(), "UTF-8");
                    System.out.println(" [x] Rcvr2 Received '" + message + "'");
                    long deliveryTag = envelope.getDeliveryTag();
                    channel.basicAck(deliveryTag, true);
                    try {
                        Thread.sleep(1000);
                    } catch (Exception ex) {
                    }
                };
                channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {
                });
            } catch(Exception ex){
                ex.printStackTrace();
            }
        };
        rcvrPool.execute(rcvr1);
        rcvrPool.execute(rcvr2);

    }
}

我也绑定了这个示例,完全按照他们的描述并看到相同的结果。 https://self-learning-java-tutorial.blogspot.com/2015/09/rabbitmq-one-producer-and-multiple.html

我假设我的设置有问题。

【问题讨论】:

    标签: java rabbitmq


    【解决方案1】:

    根据 RabbitMQ api:

    “虽然 Channel 可以被多个线程使用,但重要的是要确保一次只有一个线程执行命令。命令的并发执行可能会导致抛出 UnexpectedFrameError”

    首先我认为你应该为不同的线程使用不同的通道。

    最后我认为第一个线程被终止是因为它是空闲的,所以只有第二个线程处于活动状态并完成整个工作。在这种情况下,一个线程就足够了。

    看看 java 8 的 ThreadPoolExecutor api:

    https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ThreadPoolExecutor.html

    例如,您可以找到:

    "默认情况下,即使是核心线程也只会在新任务到达时才最初创建和启动,但这可以使用方法 prestartCoreThread() 或 prestartAllCoreThreads() 动态覆盖。如果您使用非空队列”

    “如果池当前有多个 corePoolSize 线程,如果超过 keepAliveTime 的空闲线程将被终止(参见 getKeepAliveTime(TimeUnit))。”

    您应该使用 prestartAllCoreThreads() 或 prestartCoreThreads() 来让核心线程在空闲时启动,或者使用 getKeepAliveTime(TimeUnit) 来让它们在空闲时保持活动状态。

    【讨论】:

    • 嗨撒克逊人,请参阅修改后的代码。我尝试了新的渠道,甚至新的连接和渠道。我什至尝试创建两个具有完全相同结果的应用程序实例(单线程)。第一个启动的接收器收到所有消息,而第二个接收器没有收到任何消息。
    • 你是对的。对不起!代码看的不是很好。
    • 您使用的是哪个 java 客户端版本?
    • 我使用的是客户端版本 5.5.1
    【解决方案2】:

    看起来我错过了关键频道配置。这解决了我的问题:

    channel.basicQos(1);

    这就是 RabbitMQ 不得不说的。

    公平调度

    您可能已经注意到调度仍然无法正常工作 如我们所愿。例如,在有两个工人的情况下,当都是奇数时 消息很重,甚至消息很轻,一个工人将是 一直很忙,而另一个几乎不会做任何工作。出色地, RabbitMQ 对此一无所知,仍然会派发 消息均匀。

    这是因为 RabbitMQ 只是在 消息进入队列。它不看数量 消费者未确认的消息。它只是盲目地发送 每第 n 条消息发送给第 n 个消费者。

    为了解决这个问题,我们可以使用 basicQos 方法 prefetchCount = 1 设置。这告诉 RabbitMQ 不要提供超过 一次向工作人员发送一条消息。或者,换句话说,不要派遣 一个新消息给一个工作人员,直到它处理并确认 前一个。相反,它将把它分派给下一个工人 还不忙。

    【讨论】: