【发布时间】:2019-08-12 01:06:09
【问题描述】:
我有以下问题:
g 数据生成器,每个都在它单独的Process 中,产生n 值,我需要以循环方式(通过g)写出。所以对于生成器A、B、C,输出必须是有序的:
<value 1 of A>
<value 1 of B>
<value 1 of C>
<value 2 of A>
<value 2 of B>
<value 2 of C>
<value 3 of A>
...
在功能上它可以工作,但仍然比单线程慢得多。由于我需要生成大量值,因此我想并行化生成(可能还有输出,这是我目前无法做到的)。
代码(更新为使用 mp.Pipe 并包含单独的数据生成器):
import random
import argparse
import multiprocessing as mp
import sys
class DataGenerator:
_id = 0
def __init__(self, id: int, **kwargs):
self._id = id
def generate(self):
return '%03d:%4x' % (self._id, random.getrandbits(16))
def produce(generator, pipe, num: int):
p_read, p_write = pipe
i = 0
while i < num:
i += 1
p_write.send(generator.generate())
def consume(pipes: list, num: int):
i = 0
p_count = len(pipes)
while i < num:
# enforce round-robin printing...
p_idx = i % p_count
p_read, p_write = pipes[p_idx]
i += 1
sys.stdout.write(p_read.recv() + '\n')
def multi_processed(num: int, processes: int):
per_process = int(num / processes)
if num % processes != 0:
per_process += 1
q = list()
g = list()
for i in range(processes):
q.append(mp.Pipe(False))
g.append(DataGenerator(i + 1))
procs = list()
for i in range(processes):
p = mp.Process(target=produce, args=(g[i], q[i], per_process))
p.start()
procs.append(p)
consume(q, num)
for p in procs:
p.join()
def single_threaded(num: int, processes: int):
g = list()
for i in range(processes):
g.append(DataGenerator(i + 1))
for i in range(num):
j = i % processes
print(g[j].generate())
def main():
parser = argparse.ArgumentParser(description='Threading test')
parser.add_argument(
'--count', '-c', dest='count', type=int, default=1000000,
help='How many total iterations (default: 1000000)')
parser.add_argument(
'--threads', '-t', dest='threads', type=int, default=1,
help='how many threads to use (default: 1 - single-threaded)')
args = parser.parse_args()
if args.threads > 1:
multi_processed(args.count, abs(args.threads))
else:
single_threaded(args.count, mp.cpu_count())
if __name__ == '__main__':
main()
在执行时,它占用了我所有的 4 个 CPU 内核,但在性能方面它比顺序执行慢:
10,000,000 个总值单线程的执行时间:
$ time python3 threading_output.py --threads 1 --count 10000000 | wc -l
10000000
real 0m16.557s
user 0m16.443s
sys 0m0.437s
...multiprocessing 实现也是如此:
$ time python3 threading_output.py --threads 4 --count 10000000 | wc -l
10000000
real 1m6.446s
user 3m10.073s
sys 0m54.274s
不使用mp.Queue 并直接在produce 循环内打印生成的值给了我大约9.6 秒,但当然,输出行没有确定的顺序。
如何加快速度?
更新 #1
使用mp.Array 不是共享缓冲区的选项,因为我需要对字符串数组使用 ctype c_wchar_p,根据docs,这根本行不通。
更新 #2
将mp.Queue(1000) 替换为mp.Pipe(False),这将1000 万个值的时间缩短至约45 秒。生产者进程现在对 CPU 的占用要少得多,而消费者是明显的瓶颈:
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
5943 ancoron 20 0 28420 15168 8348 R 99.9 0.0 0:12.23 `- python3 threading_output.py --threads 4 --count 10000000
5947 ancoron 20 0 28284 10336 3536 R 29.9 0.0 0:03.69 `- python3 threading_output.py --threads 4 --count 10000000
5948 ancoron 20 0 28284 10336 3536 R 30.8 0.0 0:03.71 `- python3 threading_output.py --threads 4 --count 10000000
5949 ancoron 20 0 28284 10336 3536 R 30.8 0.0 0:03.71 `- python3 threading_output.py --threads 4 --count 10000000
5950 ancoron 20 0 28284 10340 3536 R 29.0 0.0 0:03.58 `- python3 threading_output.py --threads 4 --count 10000000
更新 #3
我尝试使用cinda,使用简单的BytesQueue,将其缩短到约23 秒。仍然比单线程慢。
【问题讨论】:
-
???他们不需要,这不会有任何区别,因为生产者只运行
put值到Queue和consume基本上用作数据明智的join。但请解释一下,为什么在此实现中缺少join会导致非空闲 CPU 周期? -
我不确定它会更快,但也许pebble 可能会有所帮助?我有一些经验,我认为在你的情况下 pebble.map 函数(它按照你给地图的顺序返回结果的迭代器)可能正是你正在寻找的。我可以肯定地说,它会加快速度,而且不会花很长时间。调整您当前的实施超出了我的范围,并祝您好运。不过,我可以帮助你处理 Pebble。
-
谢谢,我去看看。
-
@Ancoron,实际上在 i7 Intel proc 上,我发现上面复制粘贴的代码没有问题。您的实际代码中是否还有其他额外的上下文?
-
@Anocoron 对不起,你能证实我的想法吗?如果我对你的理解比你想要这样的输出,对吧? “A1”、“B1”、“C1”、“A2”、“B2”、“C2”、“A3”、“B3”、“C3”,(以此类推)
标签: python python-3.x python-multithreading