【问题标题】:Correct way to run something in a thread that works in 2/3?在 2/3 工作的线程中运行某些东西的正确方法?
【发布时间】:2018-03-19 19:47:13
【问题描述】:

我在网上找到的每一个结果要么非常老,特定于 Python 2.7 或 3.x,要么太复杂以至于我无法理解。我的用例非常简单,但我很难找到一个简单的解决方案。我现有的代码如下所示:

return_val = func_that_makes_a_request(val1, val2)

lots = "of"
other()
code()
foo = some_other_func_that_makes_a_request(val3, val4)

something_else(foo, return_val)

第一行结束发出 HTTP 请求。接下来的几行计算一些值,然后用于在第 2 到最后一行中发出另一个 HTTP 请求。这两个部分(顶行,除最后一行之外的所有内容)彼此完全独立。最后一行需要两者的结果。

我真正想要的是并行发出这两个 HTTP 请求,因为两者都不需要等待另一个。在 Python 2.7+ 中工作的最简单方法是什么?

为未来的读者编辑 使用这个解决方案:

with multiprocessing.dummy.Pool(1) as thread_pool:
    return_async = thread_pool.apply_async(func_that_makes_a_request, (val1, val2))

    lots = "of"
    other()
    code()
    foo = some_other_func_that_makes_a_request(val3, val4)

    return_val = return_async.get()

    something_else(foo, return_val)

【问题讨论】:

  • 你能要求 Python 2.7 的 PyPI 包吗?如果你愿意pip install futures,你可以在 2.7 和 3.x 中使用它。
  • 我可以,但如果有其他几乎相同的选项,那就更好了。
  • 或者,裸threading 模块和multiprocessing.dummy 都适用于两个版本(可能有一点six-ing,但您可能想使用six 或无论如何都相当于双版本代码)。
  • 事实证明,threading 不需要six,因为 2.7 对所有内容都有旧式和新式名称。但是在任何严肃的项目中,你都会一次或一次遇到一些你想要导入 ElementTree 或需要一个可迭代的惰性字典键或其他东西的东西,它的拼写有点不同,six(或modernize , 如果你是围绕2to3 构建的) 会更容易。
  • 暂别我在这里所说的,我看到你正在主线程中执行第二个任务,所以Pool(1) 工作正常。

标签: python multithreading python-3.x python-2.7


【解决方案1】:

stdlib 中的低级 threading 模块在 2.6+ 和 3.x 中是相同的,至少对于您正在做的基本工作而言。这有点笨拙,因为开箱即用它没有为线程任务提供任何返回值的方法,但是您可以非常简单地添加它。例如:

class ReturningThread(threading.Thread):
    def run(self):
        try:
            if self._target:
                self._result = self._target(*self._args, **self._kwargs)
        finally:
            del self._target, self._args, self._kwargs
    # I'm breaking the optional timeout parameter to simplify
    def join(self):
        super(threading.Thread, self).join()
        return self._result

现在:

t1 = ReturningThread(target=func_that_makes_a_request, args=(val1, val2))

lots = "of"
other()
code()
t2 = ReturningThread(target=some_other_func_that_makes_a_request, args=(val3, val4))

return_val = t1.join()
something_else(foo, return_val)
other_return_val_we_dont_need_until_later = t2.join()

multiprocessing.dummy 模块允许您在线程之上使用更高级别的multiprocessing 样式Pool 和类似的构造。

with multiprocessing.dummy.Pool(2) as p:
    return_async = p.apply_async(func_that_makes_a_request, val1, val2)

    lots = "of"
    other()
    code()
    return_async_2 = p.apply_async(some_other_func_that_makes_a_request, val3, val4)

    return_val = return_async.get()
    something_else(foo, return_val)
    other_return_val_we_dont_need_until_later = return_async_2.get()

concurrent.futures 模块是做你想做的最简单的方法,但它在 2.7 中不存在,所以你需要pip install futures 来获取 2.7 的反向端口,然后在文件顶部:

try:
    from concurrent import futures
except ImportError:
    import futures

关于这个的好处是,您尝试执行的操作与文档中的示例之一相匹配。但是,由于您并没有真正使用任何池/执行器功能(例如能够并行等待多个结果或组合期货),它最终将与 multiprocessing 示例相同,但命名不同:

with futures.ThreadPoolExecutor(max_workers=2) as x:
    return_future = x.submit(func_that_makes_a_request, val1, val2)

    lots = "of"
    other()
    code()
    return_fut_2 = x.submit(some_other_func_that_makes_a_request, val3, val4)

    return_val = return_fut.result()
    something_else(foo, return_val)
    other_return_val_we_dont_need_until_later = return_fut_2.result()

【讨论】:

  • 谢谢!巨大的帮助。我的真实代码显然比我的示例更复杂,但我编辑了 OP 以举例说明我如何将其应用于我的实际代码。仅将线程池用于其中一项功能有什么问题吗?
  • @user3715648:不,没问题;您只能将线程池用于一项任务。在这种情况下,它并不是一个“池”,但它足以启动后台任务然后在前台做一些工作。这是 80 年代/90 年代软件的常见设计,主要是单线程,但偶尔会有大型后台工作,例如文字处理器压缩和重新分页文件,如果您正在执行类似类型的并发,它仍然可以正常工作。
【解决方案2】:

multiprocessing 模块可以轻松地同时计算多个事物,尤其是使用 Pool.map。通常这是为了将一个函数应用于不同的值,而不是完全不同的函数,但我们可以轻松地扭转这一局面。

from multiprocessing.dummy import Pool
import time

def func1():
    print('starting 1')
    time.sleep(2)
    return 1

def func2():
    print('starting 2')
    time.sleep(2)
    return 2

def run_in_parallel(*funcs):
    return Pool(len(funcs)).map(lambda f: f(), funcs)

result1, result2 = run_in_parallel(func1, func2)

print(result1 + result2)

请注意,multiprocessing.dummy 表示我们正在使用线程。如果您想要真正独立的进程(由于您发出不受 CPU 限制的 HTTP 请求,因此您不需要这些进程),您将删除 .dummy

编辑:

以上是一个可运行的示例,在您的情况下是:

def func1():
    return func_that_makes_a_request(val1, val2)

def func2():
    lots = "of"
    other()
    code()
    return some_other_func_that_makes_a_request(val3, val4)

return_val, foo = run_in_parallel(func1, func2)

【讨论】:

  • 很难将他的问题改编成map,因为他需要在提交第一个和第二个任务之间进行一些计算,然后需要阻塞第一个而不阻塞第二个.如果他可以重组他的代码以这样做,当然,它可能会更干净,并且更容易map,但如果这不可能,直接使用apply_async而不是使用imap/next 破解map 可能更简单。
  • “因为他需要在提交第一个和第二个任务之间进行一些计算”我认为将“介于两者之间”的代码放入第二个函数中没有什么大问题。 OP 的措辞表明他已经在他的脑海中捆绑了这样的代码:“(顶行,除最后一行之外的所有内容)”。我也不明白“然后需要阻止第一个而不阻止第二个”:something_else(foo, return_val) 暗示他同时需要两个任务的结果。
  • 那个return_val来自运行第一个函数;它不可能依赖第二个函数的结果。
  • 至于另一部分:是的,他可以重新组织他的代码来完成第二个函数中的额外工作。而且,正如我所说,如果他将他的代码重新组织为map-able,它也可能会更干净。但这样做并不总是可行或适当的,即使这可能是为了他的玩具示例。
  • "那个 return_val 来自于运行第一个函数;" ...和foo 来自第二个函数,当它们第一次使用时,它是在一起的。而且它不是重新组织代码,它只是将两个块放入函数中。我不打算将 func_that_makes_a_requestsome_other_func_that_makes_a_request 直接传递给 run_in_parallel,因为它们需要参数。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2011-04-17
  • 1970-01-01
  • 1970-01-01
  • 2019-08-10
  • 2019-04-06
相关资源
最近更新 更多