【问题标题】:Send many publish message: Too many publishes in progress Error发送许多发布消息:正在进行的发布太多错误
【发布时间】:2026-01-01 02:00:02
【问题描述】:

这里是 paho 异步客户端:

    client = new MqttAsyncClient(appProps.getProperty("mqtt.broker"),
            appProps.getProperty("mqtt.clientId"), new MemoryPersistence());
    client.setCallback(this);
    client.connect(null, new IMqttActionListener() {
        @Override
        public void onSuccess(IMqttToken imt) {
            try {
                client.subscribe(Constants.internalTopics, Constants.internalTopicQOS);
            } catch (MqttException ex) {
                ex.printStackTrace();
            }
        }

        @Override
        public void onFailure(IMqttToken imt, Throwable thrwbl) {
            thrwbl.printStackTrace();
        }
    });

我在这里循环发送消息:

        while (iterator.hasNext()) {
            try {
               client.publish("user/" + userId + "/downstream", mqttMessage);
            } catch(Exception ex) {
                ex.printStackTrace();
            }
        }

错误:

Too many publishes in progress (32202)
    at org.eclipse.paho.client.mqttv3.internal.ClientState.send(ClientState.java:436)
    at org.eclipse.paho.client.mqttv3.internal.ClientComms.internalSend(ClientComms.java:121)
    at org.eclipse.paho.client.mqttv3.internal.ClientComms.sendNoWait(ClientComms.java:139)
    at org.eclipse.paho.client.mqttv3.MqttAsyncClient.publish(MqttAsyncClient.java:858)
    at org.eclipse.paho.client.mqttv3.MqttAsyncClient.publish(MqttAsyncClient.java:836)

我正在使用Rabbitmq

【问题讨论】:

  • 一些上下文在这里非常有用,比如队列中有多少消息?当错误出现时,您在列表中的位置有多远?
  • 我有一个我正在迭代的 1000 个列表。我发现在达到 200-300 后它开始抛出错误。那时还有一件事,所有这 1000 个主题都没有订阅者。所以只是发布者没有订阅者

标签: rabbitmq mqtt paho


【解决方案1】:

查看 Paho 客户端的 source,看起来在任何给定时间默认的最大飞行消息数为 10。

因此,考虑到您的发布循环有多紧凑,它只会在网络层稍微放慢速度,并且您最终会在任何给定时间发送超过 10 条消息。如果您尝试以大于 0 的 QOS 发送,这只会变得更糟。

您可以使用传递给client.connect() 方法的MQTTConnectionsOptions 对象上的setMaxInflight(int n) 方法更改默认值。

我建议你尝试找到一个合适的值。

【讨论】:

  • 在网络应用程序中,这是常见的扫描仪,您从网络用户那里收到 1000 条消息,并且必须交付。那么这些情况是如何处理的呢?
  • 为限制设置一个合适的值
  • 另外,您不应该将相同的消息发布到 1000 个不同的主题,将其发布到 1 个主题并拥有 1000 个订阅者。
  • 你将如何设计一个场景。假设您有 100 万条帖子,每次向帖子添加评论时,都应该通知所有其他评论该特定帖子的用户。我正在做的是遍历所有用户 ID 并将消息发送到 user/<user_id> 主题
  • 每个故事一个主题,当用户订阅故事主题时