【发布时间】:2020-01-04 22:11:02
【问题描述】:
我正在使用 Celery 分布式任务调度库编写一个 python3 应用程序。 工人正在使用greenlet线程进行处理。该任务是与网络操作相关的 I/O。
我需要将大量 celery 任务作为单个组插入。在这种情况下,一次大约有 10000 (10k) 个 url,每个作为单独的 celery 任务。
像单个组这样的插入,在本地主机上运行 redis 或 rabbitmq 需要将近 12 秒。这太长了。
问:有什么方法可以优化这种使用 celery 的批量插入?
在其他线程中,我发现人们特别喜欢使用块,但是当我在块中提交它时 - 单个块正在单个线程中处理(不使用 greenlets,这是因为阻塞 IO 对工作人员操作是必要的)。这会导致性能下降。考虑以下数字:
- 无块:插入 12 秒,处理 9 秒。
- 使用块:插入 3 秒,处理 27 秒。
因此使用块是不可能的,因为阻塞的网络操作会扼杀 greenlet 线程的性能优势。
soa = open('input.txt').readlines()
for line in soa:
line = line.strip()
s = line.split(':')
l.append(check.s(s[0], s[1]))
#l.append(s)
t = time.time()
res = check.chunks(l, 10)()
#print(res.get())
print("Submission taken %f" % (time.time() - t))
exit()
块结果:提交耗时 2.251796 秒
l = []
soa = open('input.txt').readlines()
for line in soa:
line = line.strip()
s = line.split(':')
l.append(s)
job = group(l)
t = time.time()
result = job.apply_async()
print("Submission taken %f" % (time.time() - t))
常规结果:提交耗时 12.54412 秒
【问题讨论】: