【问题标题】:Waiting for all functions to return before returning value在返回值之前等待所有函数返回
【发布时间】:2017-07-24 18:06:01
【问题描述】:

想象一下我有这样的功能:

def func1():
    return int(requests.get('https://www.random.org/integers/?num=1&min=0&max=10&col=1&base=10&format=plain&rnd=new').text)

我想调用这个函数三次,对响应求和,然后返回平方和或一些简单的东西。我该怎么做才能使三个函数调用是异步的,但它会等待它们全部返回,就像 JavaScript 中的 Promise.all?

我正在使用 Python 2。

【问题讨论】:

  • 为什么不用线程?
  • 如果您使用的是 Python 3,您可以使用它的异步功能...在 Python 2 中,您必须使用 threading

标签: python python-2.7 asynchronous


【解决方案1】:

您需要为此使用线程,最简单的方法是通过 multiprocessing.pool.ThreadPool 包(不要让它欺骗你,尽管它在包中,但它不是多处理):

import requests
from multiprocessing.pool import ThreadPool

# let's make it a bit more flexible
RANDOM_URL = 'https://www.random.org/integers/'
RANDOM_PARAMS = {"num": 1, "min": 0, "max": 10, "col": 1, "base": 10,
                 "format": "plain", "rnd": "new"}

def func1(*args):  # args added to account for the dummy payload from pool.map
    return int(requests.get(RANDOM_URL, params=RANDOM_PARAMS).text)

pool = ThreadPool(processes=3)
response_sum = sum(pool.map(func1, range(3)))
print("Squared response: {}".format(response_sum**2))

【讨论】:

  • 只是好奇,我的答案和你的答案有什么性能差异吗?在这种 IO 绑定用例中,多线程会更高效还是多处理?
  • @SamChats - 正如我所说,虽然 ThreadPoolmultiprocessing.pool 包中,但它根本不使用多处理 - 它只是 ye olde threading 结构的便捷包装器,所以您不必自己将数据映射到线程,也不必处理线程完成处理后如何将其取回。它的主要目的是用作模拟multiprocessing.Pool,主要是出于测试/调试的原因,但对于这种情况非常有用。
  • 酷。这是否与Queue 结构远程相关?
  • 在这种情况下 - 甚至没有。
【解决方案2】:

为了您的目的,我稍微修改一下函数:

def func1():
    page = requests.get('https://www.random.org/integers/?num=1&min=0&max=10&col=1&base=10&format=plain&rnd=new').text
    num = extract_num(page) # Any parsing function that extracts the number and converts it to an integer type.
    cumulative += num # Sums up

然后,使用线程:

import threading

cumulative = 0

for i in range(3): # Gets random numbers asynchronously.
    threading.Thread(target=func1).start()
print cumulative

【讨论】:

  • 我看到你用全局变量做了什么,但我只是将随机数作为一个可能需要可变时间的请求的示例,我并不特别关心对随机数求和跨度>
  • 在打印cumulative 结果之前,您应该处理将线程连接回主线程。您还应该为 cumulative 变量使用互斥锁,因为 cumulative += num 不是原子操作。要查看问题,请将func1() 中的num 替换为固定数字(例如10)并将整个内容循环20 次,然后查看打印的内容。加上 cumulative 必须被定义为全局才能工作。
  • @Rob 哦!但是上面定义的线程可以处理可变时间。他们不会做你想做的事吗?
  • @zwer 嗯...我认为 join() 只有在初始化线程后没有任何操作时才需要...但是那里有一个print 所以我没有使用join()
  • join() 是必要的,以确保您的线程在使用它们的结果之前已完成处理(如本例中的打印)。互斥体(threading.Lock 在这里就足够了)是必要的,以确保一次只有一个线程正在修改cumulative,以便一个线程的添加不会覆盖另一个线程的添加。 global cumulative 是必需的,以便 func1() 实际上会修改全局 cumulative 变量。
【解决方案3】:

根据您的喜好,如果您可以安装第 3 方反向端口模块 https://pypi.python.org/pypi/futures,您还可以使用 concurrent.futures 作为更高级别的接口,以避免处理线程并使用与 Promise 更相似的 API /您在问题中提到的期货:

from concurrent.futures import ThreadPoolExecutor
import requests

def func1():
    return int(requests.get('https://www.random.org/integers/?num=1&min=0&max=10&col=1&base=10&format=plain&rnd=new').text)

ex = ThreadPoolExecutor()
f1 = ex.submit(func1)
f2 = ex.submit(func1)
f3 = ex.submit(func1)

print(f1.result(), f2.result(), f3.result())

【讨论】:

  • 引用OP“我正在使用Python 2。”,它也在问题的标签中。 concurrent.futures.ThreadPoolExecutor 在 Python 2 中不可用。
  • 我认为它在 2 中也是标准的,抱歉。事实证明,您必须安装一个反向端口模块才能在 python 2 pypi.python.org/pypi/futures 中获取它
猜你喜欢
  • 2020-04-09
  • 2021-12-02
  • 1970-01-01
  • 2019-06-11
  • 1970-01-01
  • 2020-03-01
  • 2014-11-26
  • 2011-08-16
  • 1970-01-01
相关资源
最近更新 更多