【发布时间】:2020-02-01 14:15:23
【问题描述】:
我目前遇到一个特别棘手的问题,我会尽力解释它。
我有一个 Django 项目,它的主要目的是快速执行数据库中的排队任务。我使用 Celery 和 Celerybeat 通过 Django 通道来实现这一点,以实时更新我的模板和响应。
Celery worker 是一个 gevent 工作池,具有相当数量的线程。
我的任务(简化版):
@shared_task
def exec_task(action_id):
# execute the action
action = Action.objects.get(pk=action_id)
response = post_request(action)
# update action status
if response.status_code == 200:
action.status = 'completed'
else:
action.status = 'failed'
# save the action to the DB
action.save()
channel_layer = get_channel_layer()
status_data = {'id': action.id, 'status': action.status}
status_data = json.dumps(status_data)
try:
async_to_sync(channel_layer.group_send)('channel_group', {'type': 'propergate_status', 'data': status_data})
except:
event_loop = asyncio.get_running_loop()
future = asyncio.run_coroutine_threadsafe(channel_layer.group_send('channel_group', {'type': 'propergate_status', 'data': status_data}), event_loop)
result = future.result()
我的错误:
[2019-10-03 18:47:59,990: WARNING/MainProcess] 动作排队:25
[2019-10-03 18:48:02,206:警告/MainProcess] c:\users\jack\documents\github\mcr-admin\venv\lib\site-packages\gevent_socket3.py:123: RuntimeWarning:从未等待协程“AsyncToSync.main_wrap”
self._read_event = io_class(fileno, 1)RuntimeWarning: Enable tracemalloc 获取对象分配回溯
[2019-10-03 18:48:02,212:警告/MainProcess] c:\users\jack\documents\github\mcr-admin\venv\lib\site-packages\gevent_socket3.py:123: RuntimeWarning:协程“BaseEventLoop.shutdown_asyncgens”从来没有 等待 self._read_event = io_class(fileno, 1) RuntimeWarning:
最初在我将操作保存到我刚刚调用的数据库之后:
async_to_sync(channel_layer.group_send)('channel_group', {'type': 'propergate_status', 'data': status_data})
但我一直收到运行时错误,因为如果已经有一个 asyncio 事件循环已经在运行,您就不能使用 async_to_sync,as shown here at line 61。所以我有多个 gevent 线程试图 async_to_sync 非常靠近,不断地在链接中抛出错误。
这让我找到了this wonderful answer 和当前版本的 exec_task,它在向 Django Channels 组发送消息时的成功率为 98%,但我真的需要它是 100%。
这里的问题是,在我添加的协程有机会完成之前,偶尔会停止 asyncio 事件循环,并且我一直在调整我的代码,使用 asyncio 和事件循环 api,但我要么破坏我的代码,要么得到更差的结果。我感觉这可能与 Asgiref async_to_sync 函数提前关闭循环有关,但这很复杂,我几天前才开始使用 python async。
欢迎任何反馈、cmets、提示或修复!
干杯。
【问题讨论】:
-
您应该使用
except RunetimeError:,这样您就不会意外忽略可能遇到的其他错误。 -
感谢我在
except RuntimeError:中添加,遗憾的是日志没有变化。 -
我不认为它会解决你的问题,但它至少可以让你免于未来的调试噩梦。
标签: django celery gevent channels python-3.7