【发布时间】:2023-09-03 14:39:01
【问题描述】:
我有一个 Flask 应用程序,它在某些休息调用时使用 ProcessPoolExecutor 运行多个模块。
更新:添加redis作为消息队列(使用docker,redis作为redis的宿主)
socketio = SocketIO(app, message_queue='redis://redis')
(...)
def emit_event(evt, message):
socketio.emit(evt, message, namespace='/test')
@app.route('/info', methods=['GET'])
def info():
emit_event('update_reports', '')
(...)
if __name__ == "__main__":
socketio.run(host='0.0.0.0', threaded=True)
现在我添加了 redis,它在从主应用程序发射时仍然有效。 这里有一些来自我正在运行子流程的代码:
def __init__(self):
self.executor = futures.ProcessPoolExecutor(max_workers=4)
self.socketio = SocketIO(async_mode='eventlet', message_queue='redis://redis')
(...)
future = self.executor.submit(process, params)
future.add_done_callback(functools.partial(self.finished_callback, pid))
然后在那个回调中我调用emit_event 方法:
def finished_callback(self, pid, future):
pid.status = Status.DONE.value
pid.finished_at = datetime.datetime.utcnow
pid.save()
self.socketio.emit('update_reports', 'done', namespace='/test')
从我的控制器向客户端获取和发送/发送消息工作得很好,如果我从 curl 或 postman 调用 /info,我的客户端也会收到消息 - 但是 - 当尝试从其中以相同的方式发出事件时子进程回调,现在它显示这个错误:
这主要用于通知,例如通知一个长流程何时完成等。
INFO:socketio:emitting event "update_reports" to all [/test]
ERROR:socketio:Cannot publish to redis... retrying
ERROR:socketio:Cannot publish to redis... giving up
我做错了什么?
谢谢!
【问题讨论】:
-
您可以添加用于从子流程发出的代码吗?您还提到了
/info路线,但您的示例有/test。我认为这是您从 curl 调用的路线? -
感谢您的回复 Miguel,在子流程中,我只是从它所在的主 py 文件中导入
emit_event方法,并以与在/test调用中使用它相同的方式使用它.是的,这就是我要调用的 url。结果是子流程中的事件/消息永远不会到达(并且控制台中没有错误) -
@Miguel 我刚刚添加了一些代码,让我知道你的想法
标签: python-3.x flask socket.io flask-socketio