【问题标题】:Feedback message from subscriber in mqtt protocolmqtt 协议中订阅者的反馈消息
【发布时间】:2021-03-04 23:34:41
【问题描述】:

我使用 MQTT 协议在两台计算机之间发送消息。我已经从这段代码中得到了模式。 发布者:

import paho.mqtt.client as mqtt
from random import randrange, uniform
import time

mqttBroker ="mqtt.eclipse.org" 

client = mqtt.Client("Temperature_Inside")
client.connect(mqttBroker) 

while True:
    randNumber = randrange(10)

    client.publish("TEMPERATURE", randNumber)
    print("Just published " + str(randNumber) + " to topic TEMPERATURE")
    time.sleep(1)

订阅者:

import paho.mqtt.client as mqtt
import time

def on_message(client, userdata, message):
    print("received message: " ,str(message.payload.decode("utf-8")))

mqttBroker ="mqtt.eclipse.org"

client = mqtt.Client("Smartphone")
client.connect(mqttBroker) 

client.loop_start()

client.subscribe("TEMPERATURE")
client.on_message=on_message 

time.sleep(1)
client.loop_stop()

我希望在收到消息时向发布者发送反馈。有没有办法获得消息反馈?

【问题讨论】:

  • 当你说反馈时,你到底是什么意思。您想要确认订阅者已收到消息,还是想要基于订阅者解析消息的具体反馈?我相信,如果您只是想确认它到达接收器,您可以使用 QoS(服务质量)和 mqtt 来做到这一点。否则我想你将不得不让发布者订阅一个单独的频道,订阅者在完成后在该频道上发布
  • ????????正是我想要确认订阅者已收到消息。
  • 好的,我留下了一个答案来展示你如何做到这一点

标签: sockets client-server mqtt feedback


【解决方案1】:

MQTT 协议中没有端到端的传递通知。这是很刻意的。

MQTT 是一个发布/订阅系统,旨在将信息的生产者与消费者完全分开。当生产者发布消息时,可能有 0 到无限数量的主题订阅者。也可能有离线订阅者会在他们重新连接时传递消息(可能是在消息发布后的任何时间)

MQTT 提供的是 QOS 级别,但重要的是要记住,这些级别仅适用于交付过程的单程。例如。在 QOS 2 发布的消息确保它会到达代理,但不保证任何订阅者的订阅可能处于 QOS 0。

如果您的系统需要端到端传送通知,那么您需要自己实现响应消息,这通常是晚餐,通过在初始消息中包含唯一 ID 并在也包含该 ID 的不同主题中发送单独的响应消息

【讨论】:

    【解决方案2】:

    为确保您的消息能够送达,您可以使用 QoS。这可以在发布或订阅时设置。因此,对于您的情况,您将需要 QoS 1 或 2。QoS 2 确保它在发布时恰好到达代理一次,并且如果以 QoS 2 订阅,它将确保订阅者仅获得一次消息。请注意,尽管 QoS 2 是最慢的发布和订阅形式。我发现处理消息最常见的方法是使用 QoS 1,然后在您的订阅 on_message 中您可以确定如何自己处理重复消息。 paho mqtt 客户端允许您在发布或订阅时设置 QoS,但默认为 0。我在下面更新了您的代码。

    # publisher
    import paho.mqtt.client as mqtt
    from random import randrange, uniform
    import time
    import json
    
    mqttBroker ="mqtt.eclipse.org" 
    
    client = mqtt.Client("Temperature_Inside")
    client.connect(mqttBroker) 
    
    id = 1
    while True:
        randNumber = randrange(10)
        message_dict = { 'id': id, 'value': randNumber }
    
        client.publish("TEMPERATURE", json.dumps(message_dict), 1)
        print("Just published " + str(randNumber) + " to topic TEMPERATURE")
        id += 1
        time.sleep(1)
    
    # subscriber
    import paho.mqtt.client as mqtt
    import time
    import json
    from datetime import datetime
    
    parsed_messages = {}
    
    def on_message(client, userdata, message):
        json_body = json.loads(str(message.payload.decode("utf-8")))
        msg_id = json_body['id']
        if msg_id in parsed_messages.keys
            print("Message already recieved at: ", parsed_messages[msg_id].strftime("%H:%M:%S"))
        else
            print("received message: " ,json_body['value'])
            parsed_messages[msg_key] = datetime.now()
        
    
    mqttBroker ="mqtt.eclipse.org"
    
    client = mqtt.Client("Smartphone")
    client.connect(mqttBroker) 
    
    client.loop_start()
    
    client.subscribe("TEMPERATURE", 1)
    client.on_message=on_message 
    
    time.sleep(1)
    client.loop_stop()
    

    请注意,订阅者在订阅时还要定义 QoS 1,否则它将使用 paho 客户端的默认 QoS 0 进行订阅,并且消息将降级为 0,这意味着消息将最多传递一次(但如果数据包丢失,可能根本无法送达)。如上所述,仅确保消息将被订阅者接收。如果您希望发布者在订阅者处理完消息时收到通知,您将需要在订阅者完成发布者可以订阅的处理后发布一个新主题(带有一些 uuid)。然而,当我看到这样做时,我经常质疑为什么要使用 MQTT,而不仅仅是发送 HTTP 请求。如果您有兴趣,Here 是关于 MQTT QoS 的一个很好的链接(尽管它缺乏关于订阅方发生的事情的详细信息)。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2021-12-27
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多