【问题标题】:Python Threading stdin/stdoutPython 线程标准输入/标准输出
【发布时间】:2013-08-21 04:41:57
【问题描述】:

我有一个包含大量数据的文件。每一行都是一条记录。我正在尝试对整个文件进行一些 ETL 工作。现在我正在使用标准输入逐行读取数据。很酷的一点是,您的脚本可以非常灵活地与其他脚本和 shell 命令集成。我将结果写入标准输出。例如。

$ cat input_file
line1 
line2
line3
line4
...

我当前的 python 代码看起来像这样 - parse.py

import sys
for line in sys.stdin:
    result = ETL(line)    # ETL is some self defined function which takes a while to execute.
    print result

下面的代码是它现在的工作方式:

cat input_file | python parse.py > output_file

我查看了 Python 的 Threading 模块,我想知道如果我使用该模块,性能是否会显着提高。

问题1:我应该如何规划每个线程的配额,为什么?

...
counter = 0
buffer = []
for line in sys.stdin:
    buffer.append(line)
    if counter % 5 == 0:   # maybe assign 5 rows to each thread? if not, is there a rule of thumb to determine
        counter = 0
        thread = parser(buffer)
        buffer = []
        thread.start() 

问题2:多个线程可能同时将结果打印回stdout,如何组织,避免出现以下情况?

import threading
import time

class parser(threading.Thread):
    def __init__ (self, data_input):
        threading.Thread.__init__(self)
        self.data_input = data_input

    def run(self):
        for elem in self.data_input:
            time.sleep(3)
            print elem + 'Finished'

work = ['a', 'b', 'c', 'd', 'e', 'f']

thread1 = parser(['a', 'b'])  
thread2 = parser(['c', 'd'])
thread3 = parser(['e', 'f'])

thread1.start()
thread2.start()
thread3.start()   

输出真的很难看,其中一行包含来自两个线程的输出。

aFinished
cFinishedeFinished

bFinished
fFinished
dFinished

【问题讨论】:

  • 能否链接“Python 的线程模块”。无论如何,线程在访问文件时并不是一件好事,恕我直言。您需要通过锁和信号量以及作品来定义哪些核心可以访问什么以及何时访问。由于大部分工作是 I/O 工作,而不是 CPU 工作,因此您可能不会看到性能大幅提升。

标签: python multithreading stdout stdin


【解决方案1】:

首先回答您的第二个问题,这就是mutexes 的用途。通过使用锁在解析器之间进行协调,并确保在给定的时间段内只有一个线程可以访问输出流,您可以获得所需的更清晰的输出:

class parser(threading.Thread):
    output_lock = threading.Lock()

    def __init__ (self, data_input):
        threading.Thread.__init__(self)
        self.data_input = data_input

    def run(self):
        for elem in self.data_input:
            time.sleep(3)
            with self.output_lock:
                print elem + 'Finished'

关于您的第一个问题,请注意,多线程可能不会为您的特定工作负载带来任何好处。这在很大程度上取决于您对每条输入线(您的ETL 函数)所做的工作主要是受CPU 限制还是受IO 限制。如果是前者(我怀疑很可能),线程将无济于事,因为global interpreter lock。在这种情况下,您可能希望使用multiprocessing 模块在多个进程而不是多个线程之间分配工作。

但是您可以通过更易于实施的工作流程获得相同的结果:将输入文件拆分为 n 部分(例如,使用 split 命令);在每个子文件上分别调用提取和转换脚本;然后连接生成的输出文件。

一个挑剔:“使用标准输入逐行读取数据,因为它不会将整个文件加载到内存中”涉及一种误解。您可以在 Python 中逐行读取文件,例如,将sys.stdin 替换为以下构造中的文件对象:

for line in sys.stdin:

另见文件对象的readline()方法,注意read()可以将读取的最大字节数作为参数。

【讨论】:

  • 你的帖子中有很多很棒的东西,Alp。我对您的 cmets CPU-bound/IO-bound 非常感兴趣。我想知道您是否有办法确定 CPU/IO 占用了多少时间和资源?顺便说一句,他们之所以使用 stdIO 是因为您可以将脚本与 Shell Command 集成,这使得它更加灵活和方便。感谢您对“Memory midsunderstanding”的更正。
【解决方案2】:

线程是否对您有帮助很大程度上取决于您的情况。特别是,如果您的 ETL() 函数涉及大量磁盘访问,那么线程可能会给您带来显着的速度提升。

在回答您的第一个问题时,我一直认为这取决于您。在确定理想的线程数时,有很多因素在起作用,其中许多因素取决于程序。例如,如果您正在执行大量磁盘访问(这非常慢),那么您将希望更多线程在等待磁盘访问时利用停机时间。但是,如果程序是 CPU 密集型的,那么大量的线程可能不会很有帮助。因此,虽然可以分析所有因素以得出理想的线程数,但进行初始猜测然后从那里进行调整通常要快得多。

不过,更具体地说,为每个线程分配一定数量的行可能不是分配工作的最佳方式。例如,考虑一下,如果一条线需要特别长的时间来处理。如果一个线程可以在这一行上工作,而其他线程可以同时多做几行,那将是最好的。处理此问题的最佳方法是使用队列。如果将每一行推入一个队列,那么每个线程都可以从队列中拉出一行,处理它,然后重复直到队列为空。这样一来,工作就被分配了,这样就没有一个线程没有工作要做(当然,直到最后)。

现在,第二个问题。您绝对正确,一次从多个线程写入标准输出并不是一个理想的解决方案。理想情况下,您会安排一些事情,以便只在一个地方写入标准输出。一种很好的方法是使用队列。如果您让每个线程将其输出写入共享队列,那么您可以生成一个额外的线程,其唯一任务是将项目从该队列中拉出并将它们打印到标准输出。通过将打印限制为仅一个线程,您将避免尝试一次打印的多个线程所固有的问题。

【讨论】: