【问题标题】:Concurrent Producers block indefinitely in Rabbitmq并发生产者在 Rabbitmq 中无限期阻塞
【发布时间】:2026-01-24 13:10:02
【问题描述】:

我在使用 spring-rabbit-1.3.9.RELEASE 库对 Rabbitmq 3.3.5 进行 POC 时观察到奇怪的行为

当我启动一个生产线程时,事情运行顺利。但是,如果同时启动超过 1 个线程,则只有一个线程完成,所有其他线程都会无限期地阻塞,即使队列变空也是如此。

rabbitmqctl list_connections 监视时,阻塞线程的连接状态保持运行。应该注意的是,当生产者阻塞时,或者在整个运行期间的任何其他时间,都不会发出警报。

我还观察到,如果我在每次发送后休眠 1 毫秒,问题就会消失。

所以,我有这些问题

  1. rabbitmq 不支持并发生产者,高速率发布吗?
  2. 即使连接确实被阻止了,为什么它不显示在 rabbitmqctl list_connections 中?
  3. 为什么他们会无限期地阻塞而不恢复乳清队列变空?

代码

    public static void main(String[] argv) throws java.io.IOException, InterruptedException {
        init();
        PocConfig config = new PocConfig();
        int threadCount = config.getThreadCount();
        final int eventsPerThread = config.getEvents() / threadCount;
        final long sleep = config.getSleep();

        System.out.println("Start producer with configuration [threadCount=" + threadCount + ", events=" + eventsPerThread + ", sleep="
            + sleep + "]");

        ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
        for (int i = 0; i < threadCount; i++) {
            final int threadId = i;
            executorService.submit(new Runnable() {
                public void run() {
                    produce(eventsPerThread, sleep, threadId);
                }
            });
        }
        waitAndTearDown(executorService);
    }

    private static void produce(int events, long sleep, int threadId)     {
        long start = System.currentTimeMillis();
        for (int index = 1; index <= events; index++) {
            try {
                byte[] message = messageFactory.createTestMessage(index);
                amqpTemplate.convertAndSend(QUEUE_NAME, message);
                if (sleep > 0) {
                    Thread.sleep(sleep);
                }
            } catch (Exception e) {
                LOG.error("Error", e);
            }
        }
        long time = System.currentTimeMillis() - start;
        System.out.println("Producer:" + threadId + " finished, events: " + events + ", Time(s): " + time / 1000 + ", tps: " + (events * 1000) / time);
    }

弹簧配置

<bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
    <property name="addresses" value="${addresses}" />
    <property name="username" value="${user}" />
    <property name="password" value="${passwd}" />
    <property name="cacheMode" value="CONNECTION" />
    <property name="connectionCacheSize" value="${threads}" />
    <property name="channelCacheSize" value="10" />
</bean>

<rabbit:template id="template" connection-factory="connectionFactory"
    exchange="testExchange" routing-key="testQueue"/>

【问题讨论】:

    标签: java performance rabbitmq messaging spring-rabbit


    【解决方案1】:

    我想不出什么会阻止,所以我只是运行了你的测试;并且没有问题:

    Start producer with configuration [threadCount=5, events=10, sleep=0]
    Producer:2 finished, events: 1000, Time(s): 0, tps: 4405
    Producer:3 finished, events: 1000, Time(s): 0, tps: 4132
    Producer:1 finished, events: 1000, Time(s): 0, tps: 4048
    Producer:0 finished, events: 1000, Time(s): 0, tps: 3968
    Producer:4 finished, events: 1000, Time(s): 0, tps: 3952
    

    是什么让你认为他们被屏蔽了?

    进行线程转储(例如使用 jstack)以查看线程在做什么。

    编辑

    即使有 100 万条消息和CacheMode CONNECTION...,我仍然无法重现它...

    Start producer with configuration [threadCount=5, events=200000, sleep=0]
    Producer:0 finished, events: 200000, Time(s): 50, tps: 3959
    Producer:3 finished, events: 200000, Time(s): 53, tps: 3746
    Producer:1 finished, events: 200000, Time(s): 55, tps: 3635
    Producer:2 finished, events: 200000, Time(s): 55, tps: 3634
    Producer:4 finished, events: 200000, Time(s): 55, tps: 3629
    

    确实看到队列进入 flow 模式(通过管理 UI),但一切恢复正常。

    我确实看到您的工作人员处于流量控制之下...

    "pool-2-thread-3" prio=10 tid=0x00007f4af4849800 nid=0x65d5 runnable [0x00007f4ae082f000]
     java.lang.Thread.State: RUNNABLE
        at java.net.SocketOutputStream.socketWrite0(Native Method)
        at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113)
    

    你在兔子日志中看到了什么吗?您在管理 UI 上看到的有关消息速率、状态等的信息是什么?

    不管怎样,这似乎与 Spring AMQP 没有任何关系;您需要联系rabbitmq-users google 群组中的 rabbitmq 人员。

    (我正在使用 rabbitmq 3.4.2 进行测试)。

    EDIT2:

    完全干净安装 3.5.2...

    Start producer with configuration [threadCount=5, events=200000, sleep=0]
    Producer:0 finished, events: 200000, Time(s): 39, tps: 5091
    Producer:1 finished, events: 200000, Time(s): 39, tps: 5002
    Producer:2 finished, events: 200000, Time(s): 40, tps: 4954
    Producer:3 finished, events: 200000, Time(s): 40, tps: 4951
    Producer:4 finished, events: 200000, Time(s): 40, tps: 4939
    

    我在管理 UI 中没有看到 flow 状态(在队列中,但通道/连接显示它们正在流动,但又恢复了)。

    【讨论】:

    • 我正在测试一百万个事件,无论如何,这是线程转储pastebin.com/0B035D0j 的基本部分我知道它们被卡住了,因为它们不打印完成消息,以及消息计数一个线程完成后,兔子不会增加。这是示例日志pastebin.com/37rk8bYp
    • 加里,你是在本地主机上用兔子测试吗?我只在集群中的另一台机器上点击rabbitmq时看到这一点,而不是在本地主机上。关于流模式,是的,我看到一个连接进入流模式,然后恢复,但所有其他连接始终保持运行。
    • Gary,问题似乎出在 ConnectionFactory 上,因为如果我启动 5 个单独的进程,每个进程都有 1 个线程,或者如果我在单个进程中为每个线程有单独的连接工厂,那么事情运行良好。跨度>
    • 通过 LAN 到 3.5.3 集群对我来说工作正常。我不明白它怎么可能是连接工厂;在您的线程转储中,线程显然正在等待底层连接中的套接字写入,因此连接工厂显然可以提供连接。
    • 是的,我同意,它不应该......但奇怪的是,拥有单独的工厂似乎可以解决它。当然这一切都没有意义,你不能复制它,所以我必须从其他来源验证它。有什么方法可以查看为什么线程可能会卡在套接字连接上,rabbitmq 日志没有任何帮助。此外,迁移到 3.5.3 对我来说没有任何改变。