【问题标题】:Multithreading in Python 2.7Python 2.7 中的多线程
【发布时间】:2015-09-21 21:13:03
【问题描述】:

我不确定如何进行多线程处理,在阅读了一些 stackoverflow 答案后,我想出了这个。 注意:Python 2.7

from multiprocessing.pool import ThreadPool as Pool
pool_size=10
pool=Pool(pool_size)

for region, directory_ids in direct_dict.iteritems():
    for dir in directory_ids:  
        try:
            async_result=pool.apply_async(describe_with_directory_workspaces, (region, dir, username))
            result=async_result.get()
            code=result[0]      
            content=result[1]
        except Exception as e:
            print "Some error happening"
            print e

        if code==0:
            if content:
                 new_content.append(content)
            else:
                 pass
        else:
            return error_html(environ, start_response, content)

我在这里要做的是使用不同的区域和目录参数调用describe_with_directory_workspaces 并并行运行它,以便我在新内容中快速获取数据。目前,它正在串联,这给最终用户带来了缓慢的性能。

我做得对吗?或者有什么更好的方法吗?我如何确认我正在按预期运行多线程?

【问题讨论】:

  • 您尝试的解决方案中存在多个可能的问题(代码是串行的,您似乎是在 Web 请求中创建一个新的线程池)。而是从实际问题开始:它是否是 wsgi 应用程序的一部分(由error_html(..) 判断)? describe_with_..() 函数有什么作用(是否受 IO 限制)?您是否考虑过在不等待结果完成请求的情况下将任务卸载到后台线程/进程中?

标签: python multithreading python-2.7


【解决方案1】:

在您将所有作业排入队列之前,您不希望调用 async_result.get,否则您一次只能运行一项作业。

尝试先将所有作业排队,然后在所有作业都排队后处理每个结果。比如:

results = []
for region, directory_ids in direct_dict.iteritems():
    for dir in directory_ids:
        result = pool.apply_async(describe_with_directory_workspaces,
                                  (region, dir, username))
        results.append(result)

for result in results:
    code, content = result.get()
    if code == 0:
        # ...

如果您想在异步回调中处理结果,您可以向 pool.apply_async 提供回调参数,如文档中的here

for region, directory_ids in direct_dict.iteritems():
    for dir in directory_ids:
        def done_callback(result):
            pass  # process result...

        pool.apply_async(describe_with_directory_workspaces,
                         (region, dir, username),
                         done_callback)

【讨论】:

  • 有什么方法可以让结果不阻塞而不是对错误进行更多处理?我的意思是这可能对我有用,但会寻找更多选择以便我权衡。谢谢:)
  • 是的,如果您愿意,您可以提供一个回调例程,以便在每个任务完成时收到通知。
  • 使用一个比另一个的任何权衡?我能理解的是他们都做同样的事情。
  • 如果您需要进行一些处理并关心它们总体上的表现,我认为您会使用前者(收集所有结果并立即处理它们)。例如,如果其中任何一个失败,某些更高级别的操作会失败吗?当每个都是完全独立的事物时,您将使用后者。就个人而言,当有干净的替代品时(如这里),我会避免回调,因为我发现它更容易阅读。
  • 我也尝试了回调方法,您提到的方式但似乎不起作用。 done_callback 没有被调用。我错过了这个吗? stackoverflow.com/questions/32814738/…
【解决方案2】:

您应该查看 Python 的多处理模块。

来自 Python:Beazley 的基本参考:

“Python 线程实际上是相当受限的。虽然最低限度是线程安全的,但 Python 解释器使用内部全局解释器锁,它只允许单个 Python 线程在任何给定时刻执行。这限制了 Python 程序在单个处理器上运行无论系统上有多少 CPU 内核可用。”

所以:如果您正在进行大量 CPU 处理,请使用多处理。

文档链接:https://docs.python.org/2/library/multiprocessing.html

多处理池可能对您有用。

编辑:错过了代码已经在使用多处理。请参阅 cmets 以获得更有用的答案。此外,有关如何将 apply_async 与回调一起使用的示例,请参阅:Python multiprocessing.Pool: when to use apply, apply_async or map? 注意 Pool 还有一个 map_async 函数。

请参阅上述链接中的第 16.6.2.9 节。

EDIT2:在循环外使用 get( ) 的示例代码:

from multiprocessing import Pool

def sqr(x):
    return x*x

if __name__ == '__main__':
    po = Pool(processes = 10)
    resultslist = []
    for i in range(1, 10):
        arg = [i]
        result = po.apply_async(sqr, arg)
        resultslist.append(result)

    for res in resultslist:
        print(res.get())

【讨论】:

  • 我正在使用多处理池,但不确定它是否按预期工作。
  • 哎呀,漫长的一天。我看到可能出错的一件事是您在循环中调用 async_result.get() 。但是,如果结果不能立即获得,get() 将阻塞。所以我有一种感觉,无论如何这最终都会按顺序运行。
  • 另外请注意,如果您关心返回结果的顺序(即您希望它们按照调用顺序返回),那么您可能应该使用地图。
  • 这就是重点,我不希望它是连续的。另外,我对“结果”有一些 if-else,所以考虑你们可以提出的任何好的解决方法。
  • 如果您不希望它连续,请不要在循环中执行 get。在循环之外进行。或者使用回调。
【解决方案3】:

这段代码展示了 Python 中的多线程演示。我创建了一个发送和接收消息的简单信使。

import threading

class Messenger(threading.Thread):
def run(self):
    for _ in range(10):
        print(threading.current_thread().getName())

x = Messenger(name='Send out messages')
y = Messenger(name='Receive Messages')
x.start()
y.start()

【讨论】:

    猜你喜欢
    • 2016-07-10
    • 1970-01-01
    • 2018-04-24
    • 1970-01-01
    • 2017-05-16
    • 2016-04-08
    • 2014-01-26
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多