【发布时间】:2021-02-27 17:01:09
【问题描述】:
我真的很难让这个基本概念发挥作用,我可能在这个问题上花了 15 个小时,寻找答案的时间充满了无用的矛盾信息。
我有一个订阅 MQTT 主题的客户端进程,它会根据收到的消息类型做出反应。然后我想将数据发送回岸边进程。
我已经尝试过永远循环,开始循环,不循环,打开并打开,但数据没有出现在 MQTT Explorer 中(1883 上的本地端口转发以检查实时提要)
代码
主要
import os
import paho.mqtt.client as mqtt
import logging
import time
import lib.mqtt.mqtt_actions as mqtt_act
logging.basicConfig(level=logging.INFO, filename='/data/client.log', filemode='a+',format='%(asctime)s - %(name)s : %(levelname)s - %(message)s')
def main():
client = mqtt.Client()
client.tls_set(-setup tls certs-)
client.on_connect = mqtt_act.on_connect
client.on_message = mqtt_act.on_message
client.on_log = mqtt_act.on_log
client.connect(-connect to broker-)
client.loop_forever()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
client.loop_stop()
if __name__ == "__main__":
main()
MQTT 操作
import json
import logging
import time
import subprocess
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def send_ack(client, _id):
client.publish('a/topic/state_change', payload='{"id": "'+str(_id)+'","status": "START", "code": "003", "timestamp": "'+str(datetime.datetime.now())+'"}')
logging.info("send ack")
client.loop()
def on_log(client,userdata,level,buff):
print(buff)
def on_connect(client, userdata, flags, rc):
logging.info("Connected with return code: "+str(rc))
# get some sort of identifier
_id = get_id()
client.subscribe('a/topic/'+_id+'/#')
def on_message(client, userdata, msg):
_ = json.loads(msg.payload.decode())
logging.info("Received message from master")
if _['action'] == "test123":
_id = _['id']
logging.info("ID: "+str(_id))
send_ack(client, _id)
outcome = get_data()
outcome['id'] = str(_id)
try:
client.publish("a/topic/endpoint/"+str(_id), str(outcome))
client.loop()
except Exception as e:
logging.info(str(e))
输出
host:~$:/tmp/client_test# python3 listen.py
Sending CONNECT (u0, p0, wr0, wq0, wf0, c1, k60) client_id=b''
Received CONNACK (0, 0)
Sending SUBSCRIBE (d0, m1) [(b'a/topic/222/#', 0)]
Received SUBACK
Received PUBLISH (d0, q0, r0, m0), 'a/topic/222/test123', ... (89 bytes)
Sending PUBLISH (d0, q0, r0, m2), 'b'a/topic/endpoint/state_change'', ... (122 bytes)
Sending PUBLISH (d0, q0, r0, m3), 'b'a/topic/endpoint/64cb76fa-791b-11eb-bde4-005056ae7e22'', ... (2596 bytes)
Received PUBLISH (d0, q0, r0, m0), 'a/topic/222/test123', ... (89 bytes)
MQTT Explorer 中显示的内容
- state_change
- test123
【问题讨论】:
-
如果您已开始使用
client.loop_forever()或client.loop_start(),则不应在任何地方致电client.loop() -
@hardillb - 我相信我今天早些时候尝试过,它一直在尝试 send_ack(),将重试并确认
-
@hardillb - 是的,刚刚尝试删除所有
client.loop(),它一直在发布send_ack()中的状态更改 -
@hardillb - 这就是我对网络上相互矛盾的信息的意思,我不想等待
on_message()完成,否则send_ack()没有用,因为它告诉客户已收到并正在采取行动的岸边,因此我必须手动使用client.loop()- 你有什么建议? -
您应该不在回调中运行长时间运行的任务。我希望你有一个长时间运行的任务应该交给一个单独的线程
标签: python python-3.x mqtt paho