【问题标题】:相同的代码,多线程比多处理快 900 倍
【发布时间】:2022-01-22 05:17:34
【问题描述】:

您需要下载fonts.zip 并将其解压缩到与代码相同的文件夹中才能运行示例。

此代码的目的是生成随机文本,将文本渲染并保存为图像。该代码接受lettersnumbers,它们分别是用于生成文本的字母和数字的群体。它还接受character_frequency,它决定了每个字符将生成多少个实例。然后生成一个长字符串,并将其拆分为存储在TextGenerator.dataset 属性中的随机大小的子字符串,该属性由TextGenerator.initialize_dataset 产生。

例如:对于字母 = 'abc'、数字 = '123'、字符频率 = 3,'aaabbbccc111222333' 被生成、打乱并拆分为随机大小的子字符串 例如:['a312c'、'1b1'、'bba32c3a2c ']。

然后每个单词将被渲染并保存为来自TextGenerator.save_images 的图像,这是这个问题的主题。

下面的示例中有executor 参数将concurrent.futures.ThreadPoolExecutorconcurrent.futures.ProcessPoolExecutor 传递给TextGenerator,用于演示目的。

有什么问题?

character_frequency 增加的越多,存储在TextGenerator.dataset 中的数据集就越长,但是它应该不会影响性能。实际发生的情况:character_frequency 越多,TextGenerator.save_images 完成 concurrent.futures.ProcessPoolExecutor 所需的时间就越长。另一方面,一切都保持不变,而是通过concurrent.futures.ThreadPoolExecutor,所需的时间是恒定的,不受character_frequency的影响。

import random
import string
import tempfile
import textwrap
from concurrent.futures import (ProcessPoolExecutor, ThreadPoolExecutor,
                                as_completed)
from pathlib import Path
from time import perf_counter

import numpy as np
import pandas as pd
from cv2 import cv2
from PIL import Image, ImageDraw, ImageFont


class TextGenerator:
    def __init__(
        self,
        fonts,
        character_frequency,
        executor,
        max_images=None,
        background_colors=((255, 255, 255),),
        font_colors=((0, 0, 0),),
        font_sizes=(25,),
        max_example_size=25,
        min_example_size=1,
        max_chars_per_line=80,
        output_dir='data',
        workers=1,
        split_letters=False,
    ):
        assert (
            min_example_size > 0
        ), f'`min_example_size` should be > 0`, got {min_example_size}'
        assert (
            max_example_size > 0
        ), f'`max_example_size` should be > 0`, got {max_example_size}'
        self.fonts = fonts
        self.character_frequency = character_frequency
        self.executor = executor
        self.max_images = max_images
        self.background_colors = background_colors
        self.font_colors = font_colors
        self.font_sizes = font_sizes
        self.max_example_size = max_example_size
        self.min_example_size = min_example_size
        self.max_chars_per_line = max_chars_per_line
        self.output_dir = Path(output_dir)
        self.workers = workers
        self.split_letters = split_letters
        self.digits = len(f'{character_frequency}')
        self.max_font = max(font_sizes)
        self.generated_labels = []
        self.dataset = []
        self.dataset_size = 0

    def render_text(self, text_lines):
        font = random.choice(self.fonts)
        font_size = random.choice(self.font_sizes)
        background_color = random.choice(self.background_colors)
        font_color = random.choice(self.font_colors)
        max_width, total_height = 0, 0
        font = ImageFont.truetype(font, font_size)
        line_sizes = {}
        for line in text_lines:
            width, height = font.getsize(line)
            line_sizes[line] = width, height
            max_width = max(width, max_width)
            total_height += height
        image = Image.new('RGB', (max_width, total_height), background_color)
        draw = ImageDraw.Draw(image)
        current_height = 0
        for line_text, dimensions in line_sizes.items():
            draw.text((0, current_height), line_text, font_color, font=font)
            current_height += dimensions[1]
        return np.array(image)

    def display_progress(self, example_idx):
        print(
            f'\rGenerating example {example_idx + 1}/{self.dataset_size}',
            end='',
        )

    def generate_example(self, text_lines, example_idx):
        text_box = self.render_text(text_lines)
        filename = (self.output_dir / f'{example_idx:0{self.digits}d}.jpg').as_posix()
        cv2.imwrite(filename, text_box)
        return filename, text_lines

    def create_dataset_pool(self, executor, example_idx):
        future_items = []
        for j in range(self.workers):
            if not self.dataset:
                break
            text = self.dataset.pop()
            if text.strip():
                text_lines = textwrap.wrap(text, self.max_chars_per_line)
                future_items.append(
                    executor.submit(
                        self.generate_example,
                        text_lines,
                        j + example_idx,
                    )
                )
        return future_items

    def write_images(self):
        i = 0
        with self.executor(self.workers) as executor:
            while i < self.dataset_size:
                future_items = self.create_dataset_pool(executor, i)
                for future_item in as_completed(future_items):
                    filename, text_lines = future_item.result()
                    if filename:
                        self.generated_labels.append(
                            {'filename': filename, 'label': '\n'.join(text_lines)}
                        )
                    self.display_progress(i)
                i += min(self.workers, self.dataset_size - i)
                if self.max_images and i >= self.max_images:
                    break

    def initialize_dataset(self, letters, numbers, space_freq):
        for characters in letters, numbers:
            dataset = list(
                ''.join(
                    letter * self.character_frequency
                    for letter in characters + ' ' * space_freq
                )
            )
            random.shuffle(dataset)
            self.dataset.extend(dataset)
        i = 0
        temp_dataset = []
        min_split_example_size = min(self.max_example_size, self.max_chars_per_line)
        total_letters = len(self.dataset)
        while i < total_letters - self.min_example_size:
            example_size = random.randint(self.min_example_size, self.max_example_size)
            example = ''.join(self.dataset[i : i + example_size])
            temp_dataset.append(example)
            i += example_size
            if self.split_letters:
                split_example = ' '.join(list(example))
                for sub_example in textwrap.wrap(split_example, min_split_example_size):
                    if (sub_example_size := len(sub_example)) >= self.min_example_size:
                        temp_dataset.append(sub_example)
                        i += sub_example_size
        self.dataset = temp_dataset
        self.dataset_size = len(self.dataset)

    def generate(self, letters, numbers, space_freq, fp='labels.csv'):
        self.output_dir.mkdir(parents=True, exist_ok=True)
        self.initialize_dataset(letters, numbers, space_freq)
        t1 = perf_counter()
        self.write_images()
        t2 = perf_counter()
        print(
            f'\ntotal time: {t2 - t1} seconds, character frequency '
            f'specified: {self.character_frequency}, type: {self.executor.__name__}'
        )
        pd.DataFrame(self.generated_labels).to_csv(self.output_dir / fp, index=False)


if __name__ == '__main__':
    out = Path(tempfile.mkdtemp())
    total_images = 15
    for char_freq in [100, 1000, 1000000]:
        for ex in [ThreadPoolExecutor, ProcessPoolExecutor]:
            g = TextGenerator(
                [
                    p.as_posix()
                    for p in Path('fonts').glob('*.ttf')
                ],
                char_freq,
                ex,
                max_images=total_images,
                output_dir=out,
                max_example_size=15,
                min_example_size=5,
            )
            g.generate(string.ascii_letters, '0123456789', 1)

在我的 i5 mbp 上产生以下结果:

Generating example 15/649
total time: 0.0652076720000001 seconds, character frequency specified: 100, type: ThreadPoolExecutor
Generating example 15/656
total time: 1.1637316500000001 seconds, character frequency specified: 100, type: ProcessPoolExecutor
Generating example 15/6442
total time: 0.06430166800000015 seconds, character frequency specified: 1000, type: ThreadPoolExecutor
Generating example 15/6395
total time: 1.2626316840000005 seconds, character frequency specified: 1000, type: ProcessPoolExecutor
Generating example 15/6399805
total time: 0.05754961300000616 seconds, character frequency specified: 1000000, type: ThreadPoolExecutor
Generating example 15/6399726
total time: 45.18768219699999 seconds, character frequency specified: 1000000, type: ProcessPoolExecutor

0.05 秒(线程)与 45 秒(进程)来保存 15 张图像,character_frequency = 1000000。为什么需要这么长时间?为什么它会受到character_frequency 值的影响?这是独立的,应该只影响初始化时间(这正是线程发生的事情)

【问题讨论】:

  • 如果这是一条规则,那么它应该适用于character_frequency = 100 和 1000 的情况,并且它应该需要相同的相对时间量,但情况并非如此。如果character_frequency 较低,则进程花费的时间非常接近等效线程,这很奇怪。
  • 将你想要序列化的代码移动到单独的函数中,并尽可能少地给它和尽可能小的参数。

标签: python multithreading performance multiprocessing


【解决方案1】:

假设我正确解释了您的代码,您将生成大小由character_frequency 值控制的示例文本。值越大,文本越长。

文本是在程序的主循环中生成的。然后你安排一组任务接收所述文本并根据它生成图像。

由于进程位于不同的内存地址空间中,因此需要通过pipe 将文本发送给它们。该管道是影响性能的瓶颈。您看到character_frequency 增长时性能下降的原因是因为需要序列化更多文本并按顺序通过所述管道发送。您的员工在等待数据到达时正在挨饿。

此问题不会影响您的线程池,因为线程位于与主进程相同的内存地址空间中。因此,数据不需要序列化并通过您的操作系统发送。

要在使用进程时加快程序速度,您可以在工作程序本身中移动文本生成逻辑,或者将所述文本写入一个或多个文件中。然后让工作进程自己打开这些文件,这样您就可以利用 I/O 并行化。您的所有主要过程都是将工作人员指向正确的文件位置或文件名。

【讨论】:

  • 所以假设我不是将结果文本保存在列表中,而是将结果示例保存到磁盘中的单独文件中,这些文件将被工作人员使用和丢弃,对吗?我不明白的是,为什么每次为工人分配一些要写入的文本而不是仅分配给他们的文本时,必须重新序列化整个字符串列表TextGenerator.dataset/仅将大列表序列化一次并在以后重用它某种缓存、服务器进程或其他什么?
  • 当您自己发送text (self.dataset.pop()) 的位时,该列表仅被序列化一次。尽管如此,您仍然需要通过单一通信渠道传递该文本。在设计基于多处理的应用程序时,需要考虑进程间通信 (IPC) 开销。这同样适用于分布式架构:如果设计不当,网络 I/O 将成为您的主要瓶颈。
  • 按照最快的解决方案:如果您的主流程中不需要文本,我会将文本生成参数发送给工作人员自己,让他们生成随机字符串。否则,您可以将它们写入一个文件并将每个文本的文件位置发送给工作人员,或者将每个文本写入一个专用文件并发送其名称。
  • 如果列表只被序列化一次,那么为什么增加它的大小会影响每个工人调用呢? character_frequency 不会影响字长,它会影响存储在 TextGenerator.dataset 中等待写入的总字数。字长通过max_example_sizemin_example_size 控制。如果发送给工作人员的每个字符串的大小增加一倍或三倍,速度下降 2 到 3 倍,那就更有意义了。如果总字数较低,则工作进程写入 5-15 个字母字符串的工作调用需要 0.00001 秒,而相同的字符串可能需要 10 秒或更长的总字数。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-10-15
  • 1970-01-01
  • 1970-01-01
  • 2011-08-22
  • 1970-01-01
  • 2012-10-07
相关资源
最近更新 更多