【问题标题】:Paho MQTT - Log says message published but nothing sent outPaho MQTT - 日志显示消息已发布但未发送任何内容
【发布时间】:2021-02-27 17:01:09
【问题描述】:

我真的很难让这个基本概念发挥作用,我可能在这个问题上花了 15 个小时,寻找答案的时间充满了无用的矛盾信息。

我有一个订阅 MQTT 主题的客户端进程,它会根据收到的消息类型做出反应。然后我想将数据发送回岸边进程。

我已经尝试过永远循环,开始循环,不循环,打开并打开,但数据没有出现在 MQTT Explorer 中(1883 上的本地端口转发以检查实时提要)

代码

主要

import os
import paho.mqtt.client as mqtt
import logging
import time
import lib.mqtt.mqtt_actions as mqtt_act
logging.basicConfig(level=logging.INFO, filename='/data/client.log', filemode='a+',format='%(asctime)s - %(name)s : %(levelname)s - %(message)s')

def main():
    client = mqtt.Client()
    client.tls_set(-setup tls certs-)
    client.on_connect = mqtt_act.on_connect
    client.on_message = mqtt_act.on_message
    client.on_log = mqtt_act.on_log
    client.connect(-connect to broker-)
    client.loop_forever()
    try:
     while True:
      time.sleep(1)
    except KeyboardInterrupt:
        client.loop_stop()

if __name__ == "__main__":
    main()

MQTT 操作

import json
import logging
import time
import subprocess 
logger = logging.getLogger()
logger.setLevel(logging.INFO)    

def send_ack(client, _id):
    client.publish('a/topic/state_change', payload='{"id": "'+str(_id)+'","status": "START", "code": "003", "timestamp": "'+str(datetime.datetime.now())+'"}')
    logging.info("send ack")
    client.loop()

def on_log(client,userdata,level,buff):
    print(buff)

def on_connect(client, userdata, flags, rc):
        logging.info("Connected with return code: "+str(rc))
        # get some sort of identifier
        _id = get_id()
        client.subscribe('a/topic/'+_id+'/#')
def on_message(client, userdata, msg):
        _ = json.loads(msg.payload.decode())
        logging.info("Received message from master")
        if _['action'] == "test123":
            _id = _['id']
            logging.info("ID: "+str(_id))
            send_ack(client, _id)
            outcome = get_data()
            outcome['id'] = str(_id)
            try:
             client.publish("a/topic/endpoint/"+str(_id), str(outcome))
             client.loop()
            except Exception as e:
                logging.info(str(e))

输出

host:~$:/tmp/client_test# python3 listen.py 
Sending CONNECT (u0, p0, wr0, wq0, wf0, c1, k60) client_id=b''
Received CONNACK (0, 0)
Sending SUBSCRIBE (d0, m1) [(b'a/topic/222/#', 0)]
Received SUBACK
Received PUBLISH (d0, q0, r0, m0), 'a/topic/222/test123', ...  (89 bytes)
Sending PUBLISH (d0, q0, r0, m2), 'b'a/topic/endpoint/state_change'', ... (122 bytes)
Sending PUBLISH (d0, q0, r0, m3), 'b'a/topic/endpoint/64cb76fa-791b-11eb-bde4-005056ae7e22'', ... (2596 bytes)
Received PUBLISH (d0, q0, r0, m0), 'a/topic/222/test123', ...  (89 bytes)

MQTT Explorer 中显示的内容

  • state_change
  • test123

【问题讨论】:

  • 如果您已开始使用client.loop_forever()client.loop_start(),则不应在任何地方致电client.loop()
  • @hardillb - 我相信我今天早些时候尝试过,它一直在尝试 send_ack(),将重试并确认
  • @hardillb - 是的,刚刚尝试删除所有client.loop(),它一直在发布send_ack()中的状态更改
  • @hardillb - 这就是我对网络上相互矛盾的信息的意思,我不想等待on_message() 完成,否则send_ack() 没有用,因为它告诉客户已收到并正在采取行动的岸边,因此我必须手动使用client.loop() - 你有什么建议?
  • 您应该在回调中运行长时间运行的任务。我希望你有一个长时间运行的任务应该交给一个单独的线程

标签: python python-3.x mqtt paho


【解决方案1】:

所以我接受了给出的建议并设置了线程,我还将loop_forever() 更改为loop_start()

主要

import os
import paho.mqtt.client as mqtt
import logging
import time
import lib.mqtt.mqtt_actions as mqtt_act
logging.basicConfig(level=logging.INFO, filename='/data/client.log', filemode='a+',format='%(asctime)s - %(name)s : %(levelname)s - %(message)s')

def main():
    client = mqtt.Client()
    client.tls_set(-setup tls certs-)
    client.on_connect = mqtt_act.on_connect
    client.on_message = mqtt_act.on_message
    client.on_log = mqtt_act.on_log
    client.connect(-connect to broker-)
    client.loop_start()
    try:
     while True:
      time.sleep(1)
    except KeyboardInterrupt:
        client.loop_stop()

if __name__ == "__main__":
    main()

MQTT

import json
import logging
import time
import threading
import subprocess 
logger = logging.getLogger()
logger.setLevel(logging.INFO)    

def send_ack(client, _id):
    client.publish('a/topic/state_change', payload='{"id": "'+str(_id)+'","status": "START", "code": "003", "timestamp": "'+str(datetime.datetime.now())+'"}')
    logging.info("send ack")

def on_log(client,userdata,level,buff):
    print(buff)

def on_connect(client, userdata, flags, rc):
        logging.info("Connected with return code: "+str(rc))
        # get some sort of identifier
        _id = get_id()
        client.subscribe('a/topic/'+_id+'/#')
def on_message(client, userdata, msg):
        _ = json.loads(msg.payload.decode())
        logging.info("Received message from master")
        if _['action'] == "test123":
            _id = _['id']
            logging.info("ID: "+str(_id))
            send_ack(client, _id)
            t1 = threading.Thread(target=start_test, args=(_id, 5,client,), name='test_thread')
            t1.start()

def start_test(ID, int_a, client):
    outcome = get_data()
    outcome['id'] = ID
    data = json.dumps(outcome)
    try:
        client.publish("a/test/endpoint/"+str(outcome['id']), data)
    except Exception as e:
        logging.info(str(e))

【讨论】:

  • 如果您使用 loop_start() 则不会,因为它将处理后台线程上的保持活动消息
  • 你不应该在回调中做阻塞任务
  • @hardillb - 好的,如果您知道解决方案,请提供适当的回复,我会审核并标记为答案,否则我会在几天后自己标记
  • @hardillb - 我已更新此答案以实现您的建议,您能否查看并确认或建议?
  • 你还在on_message()回调中调用get_data(),在send_ack()中调用client.loop()
猜你喜欢
  • 2022-07-06
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2011-03-26
  • 2012-08-28
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多