【发布时间】:2021-09-15 20:13:42
【问题描述】:
我正在开发一个基于 docker 的 Celery 支持的 Python 应用程序,其中一个功能是触发和发送给定数字的文本消息。工作流程如下:
- 用户上传包含一组条目的 CSV,文本消息应发送到这些条目
- cron 作业每 60 秒轮询一次数据库以获取任何新条目并将它们添加到队列中
- 如果发现新条目,将它们放入队列并触发短信
目前,如果我上传一个包含 3 个条目的 CSV 文件,每个短信操作都会按顺序触发,而不是并行触发(默认 celery 进程行为)。例如,如果调度程序每 10 秒从队列中获取一个作业,则发送 3 条文本消息所用的时间将是 30 秒。 由于这些作业是相互独立的,我想将其并行化,以便同时发送所有三个文本消息。
我已尝试增加队列的并发性,但假设每个线程将被分配三个消息之一,但它不起作用。恐怕我可能缺少一些东西。 是否需要添加一些其他配置才能并行化作业?
运行芹菜队列的命令
celery worker --app=worker.app --concurrency=5 --hostname=worker1@%h --loglevel=INFO-Q queue1 -Ofair
芹菜配置
app = Celery(
'worker',
broker=os.environ['CELERY_BROKER'],
backend=os.environ['RABBITMQ_BACKEND'],
include=['worker.tasks','worker.schedule']
)
app.conf.update(
result_expires=3600,
task_track_started=True,
worker_prefetch_multiplier = 5
)
app.conf.beat_schedule = {
"get-message": {
"task": "worker.schedule.get_new_messages",
"schedule": 10,
'options': {'queue' : 'queue1'}
}
}
【问题讨论】:
标签: python docker rabbitmq celery