【问题标题】:Flask-SocketIO - How to emit an event from a sub-processFlask-SocketIO - 如何从子进程发出事件
【发布时间】:2023-09-03 14:39:01
【问题描述】:

我有一个 Flask 应用程序,它在某些休息调用时使用 ProcessPoolExecutor 运行多个模块。

更新:添加redis作为消息队列(使用docker,redis作为redis的宿主)

socketio = SocketIO(app, message_queue='redis://redis')

(...)

def emit_event(evt, message):
    socketio.emit(evt, message, namespace='/test')

@app.route('/info', methods=['GET'])
def info():
    emit_event('update_reports', '')

(...)
if __name__ == "__main__":
    socketio.run(host='0.0.0.0', threaded=True)

现在我添加了 redis,它在从主应用程序发射时仍然有效。 这里有一些来自我正在运行子流程的代码:

def __init__(self):
    self.executor = futures.ProcessPoolExecutor(max_workers=4)
    self.socketio = SocketIO(async_mode='eventlet', message_queue='redis://redis')

    (...)
    future = self.executor.submit(process, params)
    future.add_done_callback(functools.partial(self.finished_callback, pid))

然后在那个回调中我调用emit_event 方法:

def finished_callback(self, pid, future):
    pid.status = Status.DONE.value
    pid.finished_at = datetime.datetime.utcnow
    pid.save()

    self.socketio.emit('update_reports', 'done', namespace='/test')

从我的控制器向客户端获取和发送/发送消息工作得很好,如果我从 curl 或 postman 调用 /info,我的客户端也会收到消息 - 但是 - 当尝试从其中以相同的方式发出事件时子进程回调,现在它显示这个错误:

这主要用于通知,例如通知一个长流程何时完成等。

INFO:socketio:emitting event "update_reports" to all [/test] ERROR:socketio:Cannot publish to redis... retrying ERROR:socketio:Cannot publish to redis... giving up

我做错了什么?

谢谢!

【问题讨论】:

  • 您可以添加用于从子流程发出的代码吗?您还提到了/info 路线,但您的示例有/test。我认为这是您从 curl 调用的路线?
  • 感谢您的回复 Miguel,在子流程中,我只是从它所在的主 py 文件中导入 emit_event 方法,并以与在 /test 调用中使用它相同的方式使用它.是的,这就是我要调用的 url。结果是子流程中的事件/消息永远不会到达(并且控制台中没有错误)
  • @Miguel 我刚刚添加了一些代码,让我知道你的想法

标签: python-3.x flask socket.io flask-socketio


【解决方案1】:

在设置 Flask-SocketIO 扩展时需要遵循一些特定规则,以便外部进程可以发出,其中包括使用主进程和外部进程用来协调工作的消息队列。有关说明,请参阅文档的 Emitting from an External Process 部分。

【讨论】:

  • 感谢您的回答。我刚刚尝试从我的子进程运行那里写的内容,但是这是我在日志中得到的 Cannot publish to redis... retrying, giving up
  • 我刚刚用更多细节和代码更新了这个问题。
  • 好吧,您的子进程似乎没有连接到 Redis。您可以尝试编写一个连接到消息队列并发出的简单脚本吗?如果可行,那么您必须弄清楚您的子进程中的什么影响了与 redis 的连接。
  • 子进程不需要使用eventlet,只有主进程使用。您在子流程中所做的似乎与 eventlet 不兼容,所以最好不要在那里使用它。您可以通过在 SocketIO 类构造函数中传递 async_mode='threading' 并删除所有猴子补丁来强制 Socket.IO 不使用 eventlet。
  • 那你能单独启动你的子进程吗?我自己并没有真正与执行者合作过,我更喜欢将事情分开并自行启动每个服务。如果您正在寻找我看到成功使用的替代方案,Celery 和 RedisQueue 都很好,但我坚持要确保这些工作进程不使用 eventlet,因为您似乎正在使用 eventlet 不支持的功能。