【发布时间】:2020-03-19 09:57:54
【问题描述】:
我正在尝试制定自动缩放如何与具有复杂基于链的工作流的 celery 一起使用。
当前流程:
下载 30 个 CSV(每 6 小时),然后创建一个 celery 组来监控每个 CSV 被预处理并保存在 /tmp/folder/ 上的各个线程
在所有任务都成功并且 celery 组返回 True 后,/tmp/folder/ 被压缩并存储在 S3 上,并通过 API 调用通知其他系统。
面临的挑战:
我们有大约 40-50 个任务的大量待处理任务,这使得整个过程非常缓慢。
建议的解决方案:
自动缩放,即根据待处理任务的数量添加更多工作服务器。
这种方法是否适用于我拥有的工作流程?或者是否有可能有一个垂直缩放的解决方案?解决这个问题的最佳方法是什么?
@app.task
def process_csv(path_of_csv):
# preprocessing the csv and storing in /tmp/folder/
return True
res = group(process_csv.s(path) for path in all_paths)()
with allow_join_result():
print(res.get())
if 'False' not in res.get():
# Time to store to S3
环境信息
aiodns==2.0.0
aiohttp==3.5.4
amqp==2.5.0
async-timeout==3.0.1
attrs==19.1.0
Babel==2.7.0
billiard==3.5.0.5
boto3==1.9.197
botocore==1.12.197
celery==4.1.1
certifi==2019.6.16
cffi==1.12.3
chardet==3.0.4
ddtrace==0.31.0
Django==2.2.3
django-enumfields==1.0.0
django-extensions==2.2.1
djangorestframework==3.10.1
docutils==0.14
flower==0.9.3
gevent==1.4.0
greenlet==0.4.15
gunicorn==19.9.0
idna==2.8
idna-ssl==1.1.0
jmespath==0.9.4
kombu==4.6.3
multidict==4.5.2
psutil==5.6.5
psycopg2-binary==2.8.3
pycares==3.0.0
pycparser==2.19
python-dateutil==2.8.0
pytz==2019.1
redis==3.3.0
requests==2.22.0
s3transfer==0.2.1
six==1.12.0
slackclient==2.0.0
sqlparse==0.3.0
tornado==5.1.1
typing==3.7.4
typing-extensions==3.7.4
urllib3==1.25.3
vine==1.3.0
websocket==0.2.1
websocket-client==0.56.0
Werkzeug==0.15.5
yarl==1.3.0
我使用 RabbitMQ 代理和 Redis 作为后端。
谢谢
【问题讨论】:
标签: python-3.x rabbitmq celery autoscaling