【问题标题】:Queuing large number of celery tasks排队大量芹菜任务
【发布时间】:2020-01-04 22:11:02
【问题描述】:

我正在使用 Celery 分布式任务调度库编写一个 python3 应用程序。 工人正在使用greenlet线程进行处理。该任务是与网络操作相关的 I/O。

我需要将大量 celery 任务作为单个组插入。在这种情况下,一次大约有 10000 (10k) 个 url,每个作为单独的 celery 任务。

像单个组这样的插入,在本地主机上运行 redis 或 rabbitmq 需要将近 12 秒。这太长了。

问:有什么方法可以优化这种使用 celery 的批量插入?

在其他线程中,我发现人们特别喜欢使用块,但是当我在块中提交它时 - 单个块正在单个线程中处理(不使用 greenlets,这是因为阻塞 IO 对工作人员操作是必要的)。这会导致性能下降。考虑以下数字:

  1. 无块:插入 12 秒,处理 9 秒。
  2. 使用块:插入 3 秒,处理 27 秒。

因此使用块是不可能的,因为阻塞的网络操作会扼杀 greenlet 线程的性能优势。

soa = open('input.txt').readlines()
for line in soa:
    line = line.strip()
    s = line.split(':')
    l.append(check.s(s[0], s[1]))
    #l.append(s)
t = time.time()

res = check.chunks(l, 10)()
#print(res.get())
print("Submission taken %f" % (time.time() - t))

exit()

块结果:提交耗时 2.251796 秒

l = []

soa = open('input.txt').readlines()

for line in soa:
    line = line.strip()
    s = line.split(':')
    l.append(s)

job = group(l)
t = time.time()
result = job.apply_async()
print("Submission taken %f" % (time.time() - t))

常规结果:提交耗时 12.54412 秒

【问题讨论】:

    标签: python celery


    【解决方案1】:

    Celery 确实有一个名为 Group 和 Chunk 的任务包装器。

    https://docs.celeryproject.org/en/latest/userguide/canvas.html

    我认为,Chunk 需要一个结果后端,但只需将您的任务分成 50 到 200 个 URL 的组,就可以让 Celery 为您优化。

    但是,如果您正在执行 10000 个网络绑定任务,那么它会花费很长时间。

    【讨论】:

    • 我不知道你为什么用我知道的东西来回答,就像最初的帖子所说的那样。它提到了组和块。块在这种情况下不起作用,因为块会杀死网络事件并发的可能性。
    • 这是一个公平的问题。但是,如果您坚持使用 Celery,则实际上没有其他选择。我不确定如何在 Celery 中进行配置文件,但这可能是一个很好的谷歌。将您的工人数量增加到一定数量也可能有效。 Asyncio 也可能更好,因为每个工作人员可能会在触发下一个请求之前等待响应。