【问题标题】:HiveMQ shared subscription with persistent session具有持久会话的 HiveMQ 共享订阅
【发布时间】:2017-02-14 16:18:53
【问题描述】:

尝试结合 HiveMQ 的两个特性:共享订阅和持久会话。

如果已经创建了一个非常简单的消息生产者。和一个非常简单的消费者。 运行多个消费者时,所有消费者都会收到所有消息。

将消费者的clearSession设置为'false'后,在运行消费者时,重新启动消费者,消费者在未连接时也会收到消息。太棒了。

现在将它与共享订阅功能相结合。 仅使用共享订阅时,clearSession 为“true”。运行多个消费者时,一条消息仅由单个消费者接收。它应该是循环的,情况也是如此,但是一旦您停止消费者,消息就不再是循环的,而是其中一个消费者比其他消费者获得的消息要多得多。

如果我现在再次启用持久会话,clearSession 为“假”,并启动共享订阅消费者,消费者开始再次接收所有消息,而不是消息仅传递给一个客户端。

这里有什么问题? 这是 HiveMQ 中的错误吗? 持久会话和共享订阅不能一起使用吗?那真的很丢脸。

2017 年 15 月 2 日更新 正如@fraschbi 建议的那样,我清除了所有数据并再次使用持久会话消费者重新测试了共享订阅。它似乎有效!

但奇怪的是,只有在第一个消费者重新连接后才会收到丢失的消息。 所有消费者都有相同的代码,他们只是从不同的 clientId 参数开始。请参阅下面的代码。 我的测试序列:

  • 已启动消费者 1:所有消息都发送到此消费者。
  • 已启动消费者 2:每个消费者都会收到其他消息。
  • 已启动消费者 3:每个消费者收到 3 条消息中的 1 条。
  • 停止消费者 1:现在消费者 2 和消费者 3 收到所有其他消息。 (不知道为什么我昨天看到了这种分布不均的情况,但可能正如@fraschbi 提到的那样,因为我正在重用 clientId 并且没有取消订阅或正确断开连接)
  • 现在停止 consumer2:consumer3 现在收到的所有消息。
  • 停止 consumer3:不再收到任何消息。
  • 重新启动consumer3:它继续生产者发送的第一条消息。 它不接收丢失的消息
  • 重启consumer2:消息再次均匀分布。
  • 重新启动 consumer1:这个现在接收所有丢失的消息,然后继续接收每 3 条消息中的 1 条。

所以我的新问题是:为什么只有第一个消费者会收到丢失的消息?

注意:这里的技巧仍然是在停止客户端时不要取消订阅,因为这样订阅/持久性设置就会丢失!

Producer.scala

object Producer extends App {

  val topic = args(0)
  val brokerUrl = "tcp://localhost:1883"

  val clientId = UUID.randomUUID().toString

  val client = new MqttClient(brokerUrl, clientId)
  client.connect()
  val theTopic = client.getTopic(topic)

  var count = 0

  sys.addShutdownHook {
    println("Disconnecting client...")
    client.disconnect()
    println("Disconnected.")
  }

  while(true) {
    val msg = new MqttMessage(s"Message: $count".getBytes())
    theTopic.publish(msg)
    println(s"Published: $msg")

    Thread.sleep(1000)

    count = count + 1
  }
}

Consumer.scala

object Consumer extends App {

  val topic = args(0)
  val brokerUrl = "tcp://localhost:1883"

  val clientId = args(1)
//  val clientId = UUID.randomUUID().toString

  val client = new MqttClient(brokerUrl, clientId)
  client.setCallback(new MqttCallback {
    override def deliveryComplete(token: IMqttDeliveryToken) = ()

    override def messageArrived(topic: String, message: MqttMessage) = println(s"received on topic '$topic': ${new String(message.getPayload)}")

    override def connectionLost(cause: Throwable) = println("Connection lost")
  })

  println(s"Start $clientId consuming from topic: $topic")
  val options = new MqttConnectOptions()
  options.setCleanSession(false);

  client.connect(options)
  client.subscribe(topic)

  sys.addShutdownHook {
    println("Disconnecting client...")
//    client.unsubscribe(topic)
    client.disconnect()
    println("Disconnected.")
  }


  while(true) {

  }

}

【问题讨论】:

    标签: hivemq


    【解决方案1】:

    我将尝试分别回答您遇到的两个问题。

    它应该是循环的,情况也是如此,但是一旦您停止消费者,消息就不再是循环的,而是其中一个消费者获得的消息明显多于其他消费者。

    在为共享订阅分发消息时,HiveMQ 确实更喜欢在线客户端。

    如果我现在再次启用持久会话,clearSession 为“假”,并启动共享订阅消费者,消费者开始再次接收所有消息,而不是消息仅传递给一个客户端。

    在您问题的开头,您说您正在将带有cleanSession=false 的客户连接到代理并订阅该主题。 (听起来好像您只使用一个主题。) 在重新连接cleanSession=false 和共享订阅之前,您是否有可能没有取消订阅这些客户端?在这种情况下,您的场景第一步中的订阅仍会为这些客户端保留,并且自然而然地它们都会收到消息。

    编辑:

    所以我的新问题是:为什么只有第一个消费者会收到丢失的消息?

    来自 HiveMQ 用户指南:

    当客户端离线队列已满时,该客户端的消息不会被丢弃,而是排队等待共享订阅组中的下一个离线客户端。

    当所有客户端都离线时,分发不再是轮询。所以你描述的场景在预期的行为范围内。

    消息队列的默认值为 1000。因此,您可以在客户端离线时发送超过 1000 条消息,或者减小消息队列大小。

    ...
    <persistence>
         <queued-messages>
    
             <max-queued-messages>50</max-queued-messages>
    
         </queued-messages>
        ...
    </persistence>
    ...
    

    将此添加到您的config.xml 以减小消息队列大小。

    【讨论】:

    • 是的,客户没有退订。刚停下来。明天再测试。