【发布时间】:2020-04-13 15:52:45
【问题描述】:
我的目标是在来自 IoT 的一些模块之间的后端正确处理 MQTT 消息。我决定实现模块模拟器类,它将接收我的请求或发送响应。
第二个问题是,我需要在发布后等待模块 ACK 或 ERR。对于这个问题,我制作了这样的 ack_blocker 列表:
[
{
"module_mac": "mac",
"blocked": False,
"message": {}
},
{
"module_mac": "mac",
"blocked": False,
"message": {}
}
]
因此,当我向特定模块发送消息时,blocked 属性将设置为 True,并且在发布消息后我将在 while 循环中等待。另一方面,我发布的消息应该到达我的模拟器 MQTT 客户端,在那里它将解析数据并响应 ERR 或 ACK。在收到消息返回时,我会将阻塞属性设置回 False 并且循环将结束并将消息返回到带有错误或正确消息的后端视图。
问题是,从后端发布的消息永远不会到达模拟器 MQTT 客户端。 IDK 为什么,但在我的循环中是超时(10 秒),在此之后我应该抛出模块没有响应的错误。我非常仔细地调试整个过程,当后端要抛出错误时,我的模拟器客户端最终会收到消息。我跑了更多次,每次都会发生这种情况。所以我认为这个循环会以某种方式阻止消息发送。
这是我的循环实现:
def send_message(self, mac: str, message: str):
self.publish(mac, message)
end_time = time.time() + self.timeout
while True:
module_ack_blocker = next(filter(lambda obj: obj.get('module_mac') == mac, self.ack_blocker), None)
if not module_ack_blocker.get('blocked'):
response = module_ack_blocker.get('message')
if response.get('result') == 'OK':
logging.getLogger('root_logger').info(f'[MQTT]: ACK Message received.')
return response
elif response.get('result') == 'ERROR':
raise MQTTException(response.get('details'), status_code=mqtt_status.MQTT_ERR_NOT_SUPPORTED)
if time.time() > end_time:
raise MQTTException('Module is not responding.', status_code=mqtt_status.MQTT_ERR_UNKNOWN)
如您所见,首先我发布消息。之后,我将计算超时并开始循环。在循环中,我首先在 ack 阻止程序列表中查看正确的 dict(就像我之前提到的那样)。我会问它是否没有被阻止。在那之后,如果我还有时间超时。
我的 mqtt 模拟器客户端如下所示:
class MqttClientEmulator(object):
def __init__(self):
self.app = None
self.broker_host = None
self.broker_port = None
self.keep_alive = None
self.timeout = None
self.client = mqtt.Client(client_id='brewmaster_client_emulator')
def init(self, app):
self.broker_host = os.getenv('BROKER_HOST')
self.broker_port = int(os.getenv('BROKER_PORT'))
self.keep_alive = int(os.getenv('MQTT_KEEPALIVE'))
self.timeout = int(os.getenv('MQTT_TIMEOUT'))
self.app = app
self.client.on_message = self.on_message
def on_message(self, client, userdata, msg):
topic = msg.topic
string_message = str(msg.payload.decode('utf-8'))
dict_message = json.loads(string_message)
# Request result
if dict_message.get('device_uuid'):
print(dict_message)
response = {
"module_mac": topic,
"sequence_number": 123,
"result": "OK",
"details": ""
}
time.sleep(1) # Just for time reserve (this code will be more complicated in future)
self.publish('brewmaster-backend', json.dumps(response))
def connect(self):
self.client.connect(self.broker_host, self.broker_port, self.keep_alive)
self.client.loop_start()
def disconnect(self):
self.client.loop_stop()
self.client.disconnect()
def subscribe(self, name):
self.client.subscribe(name)
def publish(self, topic, message):
self.client.publish(topic, message)
我也尝试过线程,它也没有效果。
【问题讨论】:
标签: python loops flask mqtt publish