【问题标题】:Specific stdout line order with multi-processing具有多处理的特定标准输出行顺序
【发布时间】:2019-08-12 01:06:09
【问题描述】:

我有以下问题:

g 数据生成器,每个都在它单独的Process 中,产生n 值,我需要以循环方式(通过g)写出。所以对于生成器ABC,输出必须是有序的:

<value 1 of A>
<value 1 of B>
<value 1 of C>
<value 2 of A>
<value 2 of B>
<value 2 of C>
<value 3 of A>
...

在功能上它可以工作,但仍然比单线程慢得多。由于我需要生成大量值,因此我想并行化生成(可能还有输出,这是我目前无法做到的)。

代码(更新为使用 mp.Pipe 并包含单独的数据生成器):

import random
import argparse
import multiprocessing as mp
import sys


class DataGenerator:
    _id = 0

    def __init__(self, id: int, **kwargs):
        self._id = id

    def generate(self):
        return '%03d:%4x' % (self._id, random.getrandbits(16))


def produce(generator, pipe, num: int):
    p_read, p_write = pipe
    i = 0
    while i < num:
        i += 1
        p_write.send(generator.generate())


def consume(pipes: list, num: int):
    i = 0
    p_count = len(pipes)

    while i < num:
        # enforce round-robin printing...
        p_idx = i % p_count
        p_read, p_write = pipes[p_idx]

        i += 1
        sys.stdout.write(p_read.recv() + '\n')


def multi_processed(num: int, processes: int):
    per_process = int(num / processes)
    if num % processes != 0:
        per_process += 1

    q = list()
    g = list()
    for i in range(processes):
        q.append(mp.Pipe(False))
        g.append(DataGenerator(i + 1))

    procs = list()
    for i in range(processes):
        p = mp.Process(target=produce, args=(g[i], q[i], per_process))
        p.start()
        procs.append(p)

    consume(q, num)

    for p in procs:
        p.join()


def single_threaded(num: int, processes: int):
    g = list()
    for i in range(processes):
        g.append(DataGenerator(i + 1))

    for i in range(num):
        j = i % processes
        print(g[j].generate())


def main():
    parser = argparse.ArgumentParser(description='Threading test')
    parser.add_argument(
        '--count', '-c', dest='count', type=int, default=1000000,
        help='How many total iterations (default: 1000000)')
    parser.add_argument(
        '--threads', '-t', dest='threads', type=int, default=1,
        help='how many threads to use (default: 1 - single-threaded)')
    args = parser.parse_args()

    if args.threads > 1:
        multi_processed(args.count, abs(args.threads))
    else:
        single_threaded(args.count, mp.cpu_count())


if __name__ == '__main__':
    main()

在执行时,它占用了我所有的 4 个 CPU 内核,但在性能方面它比顺序执行慢:

10,000,000 个总值单线程的执行时间:

$ time python3 threading_output.py --threads 1 --count 10000000 | wc -l
10000000

real    0m16.557s
user    0m16.443s
sys     0m0.437s

...multiprocessing 实现也是如此:

$ time python3 threading_output.py --threads 4 --count 10000000 | wc -l
10000000

real    1m6.446s
user    3m10.073s
sys     0m54.274s

不使用mp.Queue 并直接在produce 循环内打印生成的值给了我大约9.6 秒,但当然,输出行没有确定的顺序。

如何加快速度?

更新 #1

使用mp.Array 不是共享缓冲区的选项,因为我需要对字符串数组使用 ctype c_wchar_p,根据docs,这根本行不通。

更新 #2

mp.Queue(1000) 替换为mp.Pipe(False),这将1000 万个值的时间缩短至约45 秒。生产者进程现在对 CPU 的占用要少得多,而消费者是明显的瓶颈:

  PID USER      PR  NI    VIRT    RES    SHR S  %CPU  %MEM     TIME+ COMMAND
 5943 ancoron   20   0   28420  15168   8348 R  99.9   0.0   0:12.23 `- python3 threading_output.py --threads 4 --count 10000000
 5947 ancoron   20   0   28284  10336   3536 R  29.9   0.0   0:03.69     `- python3 threading_output.py --threads 4 --count 10000000
 5948 ancoron   20   0   28284  10336   3536 R  30.8   0.0   0:03.71     `- python3 threading_output.py --threads 4 --count 10000000
 5949 ancoron   20   0   28284  10336   3536 R  30.8   0.0   0:03.71     `- python3 threading_output.py --threads 4 --count 10000000
 5950 ancoron   20   0   28284  10340   3536 R  29.0   0.0   0:03.58     `- python3 threading_output.py --threads 4 --count 10000000

更新 #3

我尝试使用cinda,使用简单的BytesQueue,将其缩短到约23 秒。仍然比单线程慢。

【问题讨论】:

  • ???他们不需要,这不会有任何区别,因为生产者只运行put 值到Queueconsume 基本上用作数据明智的join。但请解释一下,为什么在此实现中缺少 join 会导致非空闲 CPU 周期?
  • 我不确定它会更快,但也许pebble 可能会有所帮助?我有一些经验,我认为在你的情况下 pebble.map 函数(它按照你给地图的顺序返回结果的迭代器)可能正是你正在寻找的。我可以肯定地说,它会加快速度,而且不会花很长时间。调整您当前的实施超出了我的范围,并祝您好运。不过,我可以帮助你处理 Pebble。
  • 谢谢,我去看看。
  • @Ancoron,实际上在 i7 Intel proc 上,我发现上面复制粘贴的代码没有问题。您的实际代码中是否还有其他额外的上下文?
  • @Anocoron 对不起,你能证实我的想法吗?如果我对你的理解比你想要这样的输出,对吧? “A1”、“B1”、“C1”、“A2”、“B2”、“C2”、“A3”、“B3”、“C3”,(以此类推)

标签: python python-3.x python-multithreading


【解决方案1】:

好的,所以我做了一些测试,现在我很困惑。我做了多线程和异步解决方案,但都没有特别好。我还复制并粘贴了你的代码,它总是挂起,即使它“完成”了。

请注意,在我的代码中,我使用了作为 TID 给出的数字,而不是 4 个随机的十六进制数字,因为我想确保它在做你想做的事。用另一种方式更难分辨,而且可以很容易地改为十六进制。

单线程:

import random
import sys

def generate():
    return random.randrange(-10, 10)

if len(sys.argv) < 2:
    print("NEED ARGS")
    exit(0)

num = int(sys.argv[1])
for x in range(num):
    for _ in range(x):
        print("[{}]: {}".format(x, generate()))

多线程:

from concurrent.futures import TimeoutError
from pebble import ThreadPool, ProcessExpired
import random
import multiprocessing as mp
import sys 

def generate():
    return random.randrange(-10, 10) 

def produce(num):
    #tid = '%04x' % random.getrandbits(16)
    tid = num 
    i = 0 
    while i < num:
        print('[%s] %3d' % (tid, generate()))
        i += 1

if __name__ == "__main__":
    if len(sys.argv) < 2:
        print("NEED ARGS")
        exit(0)

    num = int(sys.argv[1])
    with ThreadPool(max_workers=mp.cpu_count()) as pool:
        future = pool.map(produce, range(num), timeout=3)
        iterator = future.result()
        while True:
            try:
                result = next(iterator)
            except StopIteration:
                break
            except TimeoutError as error:
                print(error)
                break
            except ProcessExpired as error:
                print(error)
                break
            except Exception as error:
                print(error)
                break

说实话,我没有看到速度有很大的变化。多处理的实际上更慢,这几乎是最基本的。我刚刚记得的是PyPy,它以计算速度而闻名。我真的不想设置它,但考虑到问题的简单、重复和纯计算性质,我认为它可能是您的解决方案。

基准是:

100 次迭代 0.3 秒

单次 10 秒,多 11 秒,1000 次迭代

我放弃了,因为它需要多长时间。我不知道如何描述它,但每增加一个量级,你就会多做 100 倍的工作。使用高斯模式的证明:

你正在做每个数字的总和,直到 num,这意味着 1 + 2 + ... 并且高斯的模式涵盖了这一点。这应该可以大致了解它有多大:

10 作为输入需要 550 次迭代

100 作为输入需要 5050 次迭代

1000 作为输入需要 500500 次迭代

10000 作为输入需要 50005000 次迭代

通过 excel 输入数据后,它是 O(n^2),我猜这还不错。如果你好奇的话,这个等式是 ~.55x^2。

您是否介意链接您制作的其他程序变体,以便我可以将它们与我自己的程序进行对比?因为老实说,我很想知道它们是否工作正常/我做错了什么。

Tl;DR:您使用了哪些测试/代码以便我进行比较?你试过 PyPy 吗?与打印数字相比,数据库可以吗(几乎可以肯定会更快)?您是如何设法让您的程序以如此快的速度单线程运行的?

希望这会有所帮助!

编辑:只是检查一下,您确实想要执行以下操作,对吗?在第一次迭代中,您打印一次 ID 和一个随机数。在第二次迭代中,您两次打印 ID 和随机数。只是想检查一下。

编辑 2:代码应该是固定的。

from concurrent.futures import TimeoutError
from pebble import ThreadPool, ProcessExpired
import random
import multiprocessing as mp
import sys

def generate():
    return random.randrange(-10, 10)

def produce(num):
    tid = '%04x' % random.getrandbits(16)
    for _ in range(num):
        print('[%s] %3d' % (tid, generate()))

if __name__ == "__main__":
    if len(sys.argv) < 3:
        print("NEED ARGS")
        exit(0)

    num = int(sys.argv[1])
    workers = int(sys.argv[2])
    num_per_worker = int(num/workers)

    #The magic numbers here are just optimizations. Feel free to change them
    with ThreadPool(max_workers=workers, max_tasks=50) as pool:
        future = pool.map(produce, (num_per_worker for _ in range(workers)),
                          chunksize=round(num/1024))
        iterator = future.result()
        while True:
            try:
                result = next(iterator)
            except StopIteration:
                break

编辑 3:循环法

from pebble import ThreadPool, ProcessExpired
import random
import multiprocessing as mp
import sys 
from functools import partial

def generate():
    return random.randrange(-10, 10) 

def produce(num, magic_array):
    tid = '%04x' % random.getrandbits(16)
    for _ in range(num):
        magic_array.append('[%s] %3d' % (tid, generate()))

if __name__ == "__main__":
    if len(sys.argv) < 3:
        print("NEED ARGS")
        exit(0)

    num = int(sys.argv[1])
    workers = int(sys.argv[2])
    num_per_worker = int(num/workers)
    magic_array = []

    #This is the how the magic array is used as an argument.
    #There's probably a better way to do it, but I don't know it
    part_produce = partial(produce, magic_array=magic_array)
    #The magic numbers here are just optimizations. Feel free to change them
    with ThreadPool(max_workers=workers, max_tasks=50) as pool:
        future = pool.map(part_produce, (num_per_worker for _ in range(workers)), chunksize=num_per_worker)
        iterator = future.result()
        while True:
            try:
                result = next(iterator)
            except StopIteration:
                break

    #This is the important part. For every iteration/worker unit, it will go
    #through the list in steps of iteration/worker units, with a start offset
    #of x
    #Just printing takes about 5 seconds, but I don't think there's a faster
    #method because printing takes a long time anyway
    for x in range(num_per_worker):
        for y in magic_array[x::num_per_worker]:
            print(y)

【讨论】:

  • “只是为了检查”:不。正如我在开头和我的测试代码(这是per_process = int(num / processes) 的原因)中尝试描述的那样,我需要使用processes 生成器生成num 输出行,最后每个都有自己的配置(我忽略了只需使用随机的generate 进行测试。最大的问题实际上是需要按顺序迭代生成器的输出。我将使用我正在使用的完整测试程序更新代码。
  • 我可能会误解您的代码,但如果我使用单线程,每个输出都会得到相同的十六进制值。多线程没有同样的问题(当我使用多个线程作为参数时)
  • 是的,这是用于测试的。每个进程/线程有一个生成器。当然,打破这种相关性的解决方案也受到高度赞赏。 :-)
  • 新代码已上线。我在笔记本电脑上,所以我不知道上面的代码在你的电脑上运行得有多好,但我认为这是你想要的。如果你想要一个不同的生成器,你所要做的就是用你想使用的任何生成器替换未来变量下的生成器。如果你给我一个例子,我可以尝试为你做。我的基准测试非常糟糕,但它们大约是 35 秒,而不是你的一分钟多。
  • 非常感谢您的更新,在我的机器上它给了我大约 16.5 秒的 10m 值,基本上与我使用没有输出排序的情况相同。但尤其是值的循环排序(在任意数量的生成器上)输出是这里的主要功能标准。
【解决方案2】:

在摆弄了许多不同的选项之后,我的(目前是最终的)解决方案将使用 Memory mapped file 并且不输出到标准输出。

在实现更类似于生成器输出类型之一的更改后(UUID - 36 个字符 + 新行 - 来自生成器的 16 字节值),我机器上的结果数字是:

单线程

$ time python3 threading_output.py --threads 1 --count 10000000 | wc -l
10000000

real    0m15.915s
user    0m16.045s
sys     0m0.629s

使用 cinda BytesQueue 进行多处理

$ time python3 threading_output.py --threads 4 --count 10000000 | wc -l
10000000

real    0m30.005s
user    0m53.543s
sys     0m28.072s

使用 mmap 进行多处理

$ time python3 threading_output.py --threads 4 --count 10000000 --output-file test.txt

real    0m6.637s
user    0m18.688s
sys     0m1.265s

加速系数约为 2.4,这看起来要好得多。虽然我们在这里使用了更多的内存,但系统应该允许

详情

mmap 版本的实际代码是利用内存映射文件的切片符号写入访问,以便每个进程都可以写入自己的“槽”。为避免使用过多内存,每个进程还仅映射输出文件的特定区域 (mmap.PAGESIZE * &lt;bytes_per_entry&gt; * 100 * &lt;processes&gt;),并在每个进程的 mmap.PAGESIZE * 100 条目之后重新映射下一个区域。

完整的测试代码

mmap 版本是用函数mmappedproduce_mm 实现的:


import random
import argparse
import multiprocessing as mp
import sys
from cinda.ipc.queue import BytesQueue as Queue
from cinda.ipc import free
import mmap
import os


class DataGenerator:
    _id = 0

    def __init__(self, id: int, **kwargs):
        self._id = id

    def generate(self):
        return self._id.to_bytes(8, 'big') + random.getrandbits(64).to_bytes(8, 'big')


def produce(generator, q, num: int):
    i = 0
    while i < num:
        i += 1
        q.put(generator.generate())


def consume(queues: list, num: int):
    i = 0
    q_count = len(queues)

    while i < num:
        # enforce round-robin printing...
        q = queues[i % q_count]

        i += 1
        hex = q.get().hex()
        sys.stdout.write('%8s-%4s-%4s-%4s-%12s\n' % (hex[:8], hex[8:12], hex[12:16], hex[16:20], hex[20:]))


def multi_processed(num: int, processes: int):
    per_process = int(num / processes)
    if num % processes != 0:
        per_process += 1

    q = list()
    g = list()
    for i in range(processes):
        name = 'gen-%d' % i
        free(name)
        q.append(Queue(name, 10000, 16))
        g.append(DataGenerator(i + 1))

    procs = list()
    for i in range(processes):
        p = mp.Process(target=produce, args=(g[i], q[i], per_process))
        p.start()
        procs.append(p)

    consume(q, num)

    for p in procs:
        p.join()


def produce_mm(generator, out_file, num: int, offset: int, num_total: int, processes: int):
    entry_size = 37 # number of bytes per entry
    seek_relative = processes * entry_size # relative offset to next slot
    seek_offset = offset * entry_size # initial slot offset for this process
    buffer_switch = mmap.PAGESIZE * 100 # number of "my" entries per buffer
    buffer_num = buffer_switch * processes # number of entries to mmap at any time
    buffer_size = buffer_num * entry_size # actual mmap'd buffer size
    buffer_offset = 0

    size = num_total * entry_size

    with open(out_file, "r+b") as f:
        mem = mmap.mmap(f.fileno(), min(num_total * entry_size, buffer_size), access=mmap.ACCESS_WRITE)

        # generate and write the first entry (saves an if-clause later)
        hex = generator.generate().hex()
        mem[seek_offset:(seek_offset + entry_size)] = ('%8s-%4s-%4s-%4s-%12s\n' % (hex[:8], hex[8:12], hex[12:16], hex[16:20], hex[20:])).encode('US-ASCII')

        (i, j) = (1, 1)
        while i < num:
            # close current, create next buffer and reset entry offset
            if (i % buffer_switch) == 0:
                mem.flush()
                mem.close()
                buffer_offset += buffer_size
                buffer_size = min(size - buffer_offset, buffer_size)
                #sys.stderr.write('New buffer[%d] at %d\n' % (buffer_size, buffer_offset))
                mem = mmap.mmap(f.fileno(), buffer_size, access=mmap.ACCESS_WRITE, offset=buffer_offset)
                j = 0

            # calculate [start:end] offsets for this slot
            off_start = seek_relative * j + seek_offset
            off_end = off_start + entry_size
            if off_end > buffer_size:
                break

            hex = generator.generate().hex()
            try:
                mem[off_start:off_end] = ('%8s-%4s-%4s-%4s-%12s\n' % (hex[:8], hex[8:12], hex[12:16], hex[16:20], hex[20:])).encode('US-ASCII')
            except IndexError as e:
                sys.stderr.write('%s (tried [%d:%d] for mmap size %d, offset %d)\n' % (e, off_start, off_end, buffer_size, j))
                break

            (i, j) = (i + 1, j + 1)


def mmapped(num: int, processes: int, out_file):
    per_process = int(num / processes)
    if num % processes != 0:
        per_process += 1

    with open(out_file, "wb") as f:
        f.seek((num * 37 - 1), os.SEEK_SET)
        f.write(b'\0')
        f.flush()

    g = list()
    for i in range(processes):
        g.append(DataGenerator(i + 1))

    procs = list()
    for i in range(processes):
        p = mp.Process(target=produce_mm, args=(g[i], out_file, per_process, i, num, processes))
        p.start()
        procs.append(p)

    for p in procs:
        p.join()


def single_threaded(num: int, processes: int):
    g = list()
    for i in range(processes):
        g.append(DataGenerator(i + 1))

    for i in range(num):
        j = i % processes
        hex = g[j].generate().hex()
        sys.stdout.write('%8s-%4s-%4s-%4s-%12s\n' % (hex[:8], hex[8:12], hex[12:16], hex[16:20], hex[20:]))


def main():
    parser = argparse.ArgumentParser(description='Threading test')
    parser.add_argument(
        '--count', '-c', dest='count', type=int, default=1000000,
        help='How many total iterations (default: 1000000)')
    parser.add_argument(
        '--threads', '-t', dest='threads', type=int, default=1,
        help='how many threads to use (default: 1 - single-threaded)')
    parser.add_argument(
        '--output-file', '-o', dest='out_file', type=str,
        help='specify output file to write into')
    args = parser.parse_args()

    if args.threads > 1:
        if args.out_file is None:
            multi_processed(args.count, abs(args.threads))
        else:
            mmapped(args.count, abs(args.threads), args.out_file)
    else:
        single_threaded(args.count, mp.cpu_count())


if __name__ == '__main__':
    main()

【讨论】:

  • 我回来了,所以我可以发布this 文章。我认为它可能会回答为什么最初会出现这样的速度不足的部分原因。很高兴看到您找到了解决方案。
猜你喜欢
  • 1970-01-01
  • 2019-04-16
  • 1970-01-01
  • 2019-01-16
  • 2013-06-13
  • 2013-05-14
  • 1970-01-01
  • 1970-01-01
  • 2020-11-25
相关资源
最近更新 更多