【问题标题】:Concurrent.futures usage guide - a simple example of using both threading and processingConcurrent.futures 使用指南 - 使用线程和处理的简单示例
【发布时间】:2013-12-26 10:36:18
【问题描述】:

我想使用concurrent.futures 模块为我的程序启用并行处理/线程。

不幸的是,我似乎找不到任何使用 concurrent.futures 模块的好、简单、防白痴的例子。他们通常需要更高级的 Python 知识或处理/线程概念和行话。

下面是一个基于我的程序的简化的、独立的示例:有一个纯 CPU 绑定任务非常适合多处理,还有一个单独的 IO 绑定任务插入到数据库 (SQLite)。 在我的程序中,我已经将其转换为使用多处理池类,但是由于 CPU 绑定任务的结果都被收集起来等待任务完成,它使用了大量的内存。 因此,我希望使用线程/处理的组合,我相信 concurrent.futures 可以相当简单地为我做。

那么如何将下面的内容转换成使用这个模块的东西呢?

import sqlite3

#Stand in CPU intensive task
def calculate(value):
    return value * 10

#Stand in Thread I/O intensive task
def output(value):
    global db

    if (value % 1000) == 0:
        db.execute('delete from test_table')

    db.execute('insert into test_table (result) values (?)', (value,))

def main():
    global db
    results = []

    db  = sqlite3.connect('e:\\z_dev\\test.sqlite')
    db.cursor()

    #=========
    #Perform CPU intensive task
    for i in range(1000):
        results.append( calculate(i))

    #Perform Threading intensive task
    for a in results:
        output(a)
    #=========

    db.commit()
    db.close()

if __name__ == '__main__':
    main()

我正在寻找一个不使用任何花哨/复杂的 python 的答案。或者一个很好的清晰简单的解释,或者两者兼而有之!

谢谢

编辑:我当前的“多处理器”实现。可能是错误的,但它似乎有效。没有任何线程。这进入了上面的“#=========”部分。

#Multiprocessing
pool = multiprocessing.Pool(None)
for i in range(1000):
    results.append( pool.apply_async(calculate(i)))
pool.close()
pool.join()

for i in results:
    results[i] = results[i].get()

#Complete lack of threading; but if I had it, it'd be here:     
for a in results:
    output(a)

【问题讨论】:

    标签: python multithreading python-3.x


    【解决方案1】:

    concurrent.futures 有一个简约的 API。它很容易用于非常简单的问题,但您没有非常简单的问题。如果你这样做了,你早就解决了;-)

    您没有展示您编写的任何 multiprocessing.Pool 代码,但那将是一个更有希望的起点 - 假设您更想解决问题而不是确认您希望它必须很容易如果您只切换到较弱的 API 就可以这样做;-)

    继续使用multiprocessing 的“明显”方法是使用Pool.apply_async() 方法,将异步结果对象放在有界Queue.Queue 上,并让主程序中的线程将它们从Queue 和等待结果出现。这很容易,但这不是魔术。它解决了您的问题,因为有界Queues在以不同速度运行的生产者和消费者之间进行调解的规范方式。 concurrent.futures 中没有任何内容直接解决问题,它是您“大量内存”问题的核心。

    # Define global result_queue only in the main program.
    import Queue
    result_queue = Queue.Queue(100)  # pick a reasonable max size based on your problem
    
    # Run this in as many threads as you like.
    def consume_results():
        while True:
            a = result_queue.get()
            if a is None:
                break
            output(a.get())  # `output()` is your function
    
    ...
    # main program passes out work, after starting threads
    for i in range(1000):
        # the .put() will block so long as the queue is at its max size
        result_queue.put(pool.apply_async(calculate, args=(i,)))
    # add sentinels to let threads know they're done
    for i in range(number_of_threads_you_started):
        result_queue.put(None)
    

    这是您需要保持生产者和消费者大致平衡的东西,并且在任何标准库中都没有任何东西可以通过魔法为您做到这一点。

    编辑 - 充实它

    这是一个任何使用 Python3 的人都可以运行的完整可执行示例。备注:

    • 它不使用您的代码片段,因为它们依赖于外部数据库模块,不是每个人都可以运行。
    • 它坚持concurrent.futures 来管理进程和线程。改用multiprocessingthreading 并不难,实际上这里使用了 way 线程,直接使用threading 会更容易一些。不过这样就够清楚了。
    • concurrent.futuresFuture 对象与 multiprocessing 异步结果对象基本相同 - API 功能只是拼写不同。
    • 您的问题并不简单,因为它有多个可以以不同速度运行的阶段。同样,任何标准库中的任何内容都无法通过魔术隐藏潜在的不良后果。创建自己的有界队列仍然是最好的解决方案。对于MAX_QUEUE_SIZE 的任何合理值,此处的内存使用将保持适度。
    • 通常不希望创建比您可用的内核数量少一个更多的 CPU 绑定工作进程。主程序也需要循环运行,操作系统也是如此。
    • 一旦你习惯了这些东西,这段代码中的所有 cmets 都会很烦人,就像在代码行 i += 1 上看到注释“increment by 1” ;-)

    代码如下:

    import concurrent.futures as cf
    import threading
    import queue
    
    NUM_CPUS = 3
    NUM_THREADS = 4
    MAX_QUEUE_SIZE = 20
    
    # Runs in worker processes.
    def producer(i):
        return i + 10
    
    def consumer(i):
        global total
        # We need to protect this with a lock because
        # multiple threads in the main program can
        # execute this function simultaneously.
        with sumlock:
            total += i
    
    # Runs in threads in main program.
    def consume_results(q):
        while True:
            future = q.get()
            if future is None:
                break
            else:
                consumer(future.result())
    
    if __name__ == "__main__":
        sumlock = threading.Lock()
        result_queue = queue.Queue(MAX_QUEUE_SIZE)
        total = 0
        NUM_TO_DO = 1000
        with cf.ThreadPoolExecutor(NUM_THREADS) as tp:
            # start the threads running `consume_results`
            for _ in range(NUM_THREADS):
                tp.submit(consume_results, result_queue)
            # start the worker processes
            with cf.ProcessPoolExecutor(NUM_CPUS) as pp:
                for i in range(NUM_TO_DO):
                    # blocks until the queue size <= MAX_QUEUE_SIZE
                    result_queue.put(pp.submit(producer, i))
            # tell threads we're done
            for _ in range(NUM_THREADS):
                result_queue.put(None)
        print("got", total, "expected", (10 + NUM_TO_DO + 9) * NUM_TO_DO // 2)
    

    如果一切顺利,这是预期的输出:

    got 509500 expected 509500
    

    【讨论】:

    • 感谢您的回答,很遗憾您给了我太多的信任。我真的没有得到线程/多处理的东西,因此要求一个很好的防白痴的例子。 :-) 恐怕我无法在我自己的代码中找出你的答案;但我已经编辑了我的问题以包括我的“多处理”内容。我对 concurrent.futures 的兴趣可能是它们更易于使用/理解。
    • 我稍后会充实更多代码(现在没有时间)。你说得对,concurrent.futures 更容易使用,但它也更弱。唉,到目前为止,在您的 mp 代码中,您确实创建了一个 Pool 但您从未使用它 - 根本没有并行性。让我们一次尝试一步:-)
    • 啊,那是我的错字;现在用 pool.applyasync 纠正。在更大的数据集上,我的速度提高了 4 倍;另一方面,RAM 使用量从 ~10MB 增加到 ~1.8GB! - 因此这个问题。干杯。 :-)
    • 查看我刚才的编辑以获得完整的可执行示例。
    • 谢谢蒂姆;我已经涉足它超过 30 分钟,但无法让它工作。它作为一个独立的工作,但是当我尝试将我的问题中的“输出”功能放在那里(我把它放在with sumlock:下面) - 发生了两件事:(a)它开始变得非常慢。 (b) 尽管函数中的“print(2)”将正确打印“2”,但不会发生任何数据库操作。如果我像往常一样从“main”内部调用output(),那么数据库的东西就可以正常工作。我做错了吗?
    猜你喜欢
    • 1970-01-01
    • 2011-12-29
    • 2014-01-20
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-03-03
    • 2020-05-15
    • 1970-01-01
    相关资源
    最近更新 更多