【问题标题】:Paho MQTT while loop is blocking publish to another MQTT clientPaho MQTT while loop 阻止发布到另一个 MQTT 客户端
【发布时间】:2020-04-13 15:52:45
【问题描述】:

我的目标是在来自 IoT 的一些模块之间的后端正确处理 MQTT 消息。我决定实现模块模拟器类,它将接收我的请求或发送响应。

第二个问题是,我需要在发布后等待模块 ACK 或 ERR。对于这个问题,我制作了这样的 ack_blocker 列表:

[
    {
        "module_mac": "mac",
        "blocked": False,
        "message": {}
    },
    {
        "module_mac": "mac",
        "blocked": False,
        "message": {}
    }
]

因此,当我向特定模块发送消息时,blocked 属性将设置为 True,并且在发布消息后我将在 while 循环中等待。另一方面,我发布的消息应该到达我的模拟器 MQTT 客户端,在那里它将解析数据并响应 ERR 或 ACK。在收到消息返回时,我会将阻塞属性设置回 False 并且循环将结束并将消息返回到带有错误或正确消息的后端视图。

问题是,从后端发布的消息永远不会到达模拟器 MQTT 客户端。 IDK 为什么,但在我的循环中是超时(10 秒),在此之后我应该抛出模块没有响应的错误。我非常仔细地调试整个过程,当后端要抛出错误时,我的模拟器客户端最终会收到消息。我跑了更多次,每次都会发生这种情况。所以我认为这个循环会以某种方式阻止消息发送。

这是我的循环实现:

    def send_message(self, mac: str, message: str):
        self.publish(mac, message)
        end_time = time.time() + self.timeout

        while True:
            module_ack_blocker = next(filter(lambda obj: obj.get('module_mac') == mac, self.ack_blocker), None)

            if not module_ack_blocker.get('blocked'):
                response = module_ack_blocker.get('message')

                if response.get('result') == 'OK':
                    logging.getLogger('root_logger').info(f'[MQTT]: ACK Message received.')
                    return response
                elif response.get('result') == 'ERROR':
                    raise MQTTException(response.get('details'), status_code=mqtt_status.MQTT_ERR_NOT_SUPPORTED)

            if time.time() > end_time:
                raise MQTTException('Module is not responding.', status_code=mqtt_status.MQTT_ERR_UNKNOWN)

如您所见,首先我发布消息。之后,我将计算超时并开始循环。在循环中,我首先在 ack 阻止程序列表中查看正确的 dict(就像我之前提到的那样)。我会问它是否没有被阻止。在那之后,如果我还有时间超时。

我的 mqtt 模拟器客户端如下所示:

class MqttClientEmulator(object):
    def __init__(self):
        self.app = None
        self.broker_host = None
        self.broker_port = None
        self.keep_alive = None
        self.timeout = None

        self.client = mqtt.Client(client_id='brewmaster_client_emulator')

    def init(self, app):
        self.broker_host = os.getenv('BROKER_HOST')
        self.broker_port = int(os.getenv('BROKER_PORT'))
        self.keep_alive = int(os.getenv('MQTT_KEEPALIVE'))
        self.timeout = int(os.getenv('MQTT_TIMEOUT'))
        self.app = app

        self.client.on_message = self.on_message

    def on_message(self, client, userdata, msg):
        topic = msg.topic
        string_message = str(msg.payload.decode('utf-8'))
        dict_message = json.loads(string_message)

        # Request result
        if dict_message.get('device_uuid'):
            print(dict_message)
            response = {
                "module_mac": topic,
                "sequence_number": 123,
                "result": "OK",
                "details": ""
            }
            time.sleep(1)   # Just for time reserve (this code will be more complicated in future)
            self.publish('brewmaster-backend', json.dumps(response))

    def connect(self):
        self.client.connect(self.broker_host, self.broker_port, self.keep_alive)
        self.client.loop_start()

    def disconnect(self):
        self.client.loop_stop()
        self.client.disconnect()

    def subscribe(self, name):
        self.client.subscribe(name)

    def publish(self, topic, message):
        self.client.publish(topic, message)

我也尝试过线程,它也没有效果。

【问题讨论】:

    标签: python loops flask mqtt publish


    【解决方案1】:

    好的,我需要更深入地了解 paho MQTT 库。对象 MQTTMessageInfo 有函数 wait_for_publish。如果你看一下 _condition 对象,它已经用信号量实现了锁。所以我需要做的就是将我的 MQTT 客户端方法 send_message(如问题所示)中的 While 循环更改为如下所示:

        def send_message(self, mac: str, message: str):
            result = self.publish(mac, message)
            end_time = time.time() + self.timeout
    
            if result.rc == mqtt.MQTT_ERR_QUEUE_SIZE:
                raise ValueError('Message is not queued due to ERR_QUEUE_SIZE')
            with result._condition:
                while True:
                    module_ack_blocker = next(filter(lambda obj: obj.get('module_mac') == mac, self.ack_blocker), None)
    
                    if not module_ack_blocker.get('blocked'):
                        response = module_ack_blocker.get('message')
    
                        if response.get('result') == 'OK':
                            logging.getLogger('root_logger').info(f'[MQTT]: ACK Message received.')
                            return response
                        elif response.get('result') == 'ERROR':
                            raise MQTTException(response.get('details'), status_code=mqtt_status.MQTT_ERR_NOT_SUPPORTED)
    
                    if time.time() > end_time:
                        raise MQTTException('Daný modul neodpovedá.', status_code=mqtt_status.MQTT_ERR_UNKNOWN)
                    result._condition.wait(1)
    

    其中结果是 MQTTMessageInfo 对象,_condition.wait(1) 正在等待超时 1 秒。所以基本上什么时候等待所有其他进程都在工作,1 秒后将开始另一个 while 循环迭代并检查消息是否已经到达。

    【讨论】:

      猜你喜欢
      • 2020-01-18
      • 1970-01-01
      • 2018-08-31
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2022-07-06
      • 1970-01-01
      相关资源
      最近更新 更多