【发布时间】:2016-06-15 07:21:46
【问题描述】:
我正在通过 HTTP 连接到本地服务器 (OSRM) 以提交路由并取回行驶时间。我注意到 I/O 比线程慢,因为计算的等待时间似乎小于发送请求和处理 JSON 输出所需的时间(我认为当服务器需要一些时间来处理处理您的请求->您不希望它被阻塞,因为您必须等待,这不是我的情况)。线程受到全局解释器锁的影响,因此看起来(以及下面的证据)我最快的选择是使用多处理。
多处理的问题是它太快了,以至于它耗尽了我的套接字并且我得到一个错误(请求每次都发出一个新连接)。我可以(串行)使用 requests.Sessions() 对象来保持连接处于活动状态,但是我不能让它并行工作(每个进程都有自己的会话)。
我目前最接近的代码是这个多处理代码:
conn_pool = HTTPConnectionPool(host='127.0.0.1', port=5005, maxsize=cpu_count())
def ReqOsrm(url_input):
ul, qid = url_input
try:
response = conn_pool.request('GET', ul)
json_geocode = json.loads(response.data.decode('utf-8'))
status = int(json_geocode['status'])
if status == 200:
tot_time_s = json_geocode['route_summary']['total_time']
tot_dist_m = json_geocode['route_summary']['total_distance']
used_from, used_to = json_geocode['via_points']
out = [qid, status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
return out
else:
print("Done but no route: %d %s" % (qid, req_url))
return [qid, 999, 0, 0, 0, 0, 0, 0]
except Exception as err:
print("%s: %d %s" % (err, qid, req_url))
return [qid, 999, 0, 0, 0, 0, 0, 0]
# run:
pool = Pool(cpu_count())
calc_routes = pool.map(ReqOsrm, url_routes)
pool.close()
pool.join()
但是,我无法让 HTTPConnectionPool 正常工作,它每次都会创建新的套接字(我认为)然后给我错误:
HTTPConnectionPool(host='127.0.0.1', port=5005): 超过最大重试次数 带网址: /viaroute?loc=44.779708,4.2609877&loc=44.648439,4.2811959&alt=false&geometry=false (NewConnectionError(': 建立新连接失败: [WinError 10048] 每个套接字地址只能使用一次 (协议/网络地址/端口)通常是允许的',))
我的目标是从我在本地运行的OSRM-routing server 获取距离计算(尽快)。
我有两个部分的问题 - 基本上我正在尝试使用 multiprocessing.Pool() 将一些代码转换为更好的代码(适当的异步函数 - 以便执行永远不会中断并且运行速度尽可能快)。
我遇到的问题是我尝试的一切似乎都比多处理慢(我在下面提供了几个我尝试过的示例)。
一些潜在的方法是:gevents、grequests、tornado、requests-futures、asyncio 等。
A - Multiprocessing.Pool()
我最初是这样开始的:
def ReqOsrm(url_input):
req_url, query_id = url_input
try_c = 0
#print(req_url)
while try_c < 5:
try:
response = requests.get(req_url)
json_geocode = response.json()
status = int(json_geocode['status'])
# Found route between points
if status == 200:
....
pool = Pool(cpu_count()-1)
calc_routes = pool.map(ReqOsrm, url_routes)
我在哪里连接到本地服务器 (localhost,port:5005),它在 8 个线程和 supports parallel execution 上启动。
经过一番搜索,我意识到我得到的错误是因为请求是opening a new connection/socket for each-request。所以这实际上太快了,一段时间后会耗尽套接字。解决这个问题的方法似乎是使用 requests.Session() - 但是我无法使用多处理(每个进程都有自己的会话)来解决这个问题。
问题 1。
在某些计算机上运行良好,例如:
以后比较:45% 的服务器使用率和每秒 1700 个请求
但是,在某些情况下它没有,我不完全理解为什么:
HTTPConnectionPool(host='127.0.0.1', port=5000): 超过最大重试次数 带网址: /viaroute?loc=49.34343,3.30199&loc=49.56655,3.25837&alt=false&geometry=false (由 NewConnectionError(': 建立新连接失败: [WinError 10048] 每个套接字地址只能使用一次 (协议/网络地址/端口)通常是允许的',))
我的猜测是,因为 requests 在使用时会锁定套接字 - 有时服务器太慢而无法响应旧请求并生成新请求。服务器支持排队,但是请求不支持,而不是添加到队列中我得到错误?
问题 2。
我找到了:
阻塞还是非阻塞?
使用默认传输适配器,Requests 不提供 任何类型的非阻塞 IO。 Response.content 属性将阻塞 直到整个响应被下载。如果您需要更多 粒度,库的流式传输功能(请参阅流式传输 请求)允许您在以下位置检索更少量的响应 一次。但是,这些调用仍然会阻塞。
如果您担心阻塞 IO 的使用,有很多 将请求与 Python 之一结合起来的项目 异步框架。
两个很好的例子是 grequests 和 requests-futures。
B - 请求-期货
为了解决这个问题,我需要重写我的代码以使用异步请求,所以我尝试了以下使用:
from requests_futures.sessions import FuturesSession
from concurrent.futures import ThreadPoolExecutor, as_completed
(顺便说一下,我用使用所有线程的选项启动我的服务器)
以及主要代码:
calc_routes = []
futures = {}
with FuturesSession(executor=ThreadPoolExecutor(max_workers=1000)) as session:
# Submit requests and process in background
for i in range(len(url_routes)):
url_in, qid = url_routes[i] # url |query-id
future = session.get(url_in, background_callback=lambda sess, resp: ReqOsrm(sess, resp))
futures[future] = qid
# Process the futures as they become complete
for future in as_completed(futures):
r = future.result()
try:
row = [futures[future]] + r.data
except Exception as err:
print('No route')
row = [futures[future], 999, 0, 0, 0, 0, 0, 0]
calc_routes.append(row)
我的函数 (ReqOsrm) 现在改写为:
def ReqOsrm(sess, resp):
json_geocode = resp.json()
status = int(json_geocode['status'])
# Found route between points
if status == 200:
tot_time_s = json_geocode['route_summary']['total_time']
tot_dist_m = json_geocode['route_summary']['total_distance']
used_from = json_geocode['via_points'][0]
used_to = json_geocode['via_points'][1]
out = [status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
# Cannot find route between points (code errors as 999)
else:
out = [999, 0, 0, 0, 0, 0, 0]
resp.data = out
但是,此代码比多处理代码慢!在我每秒收到大约 1700 个请求之前,现在我得到了 600 秒。我猜这是因为我没有充分的 CPU 利用率,但是我不知道如何增加它?
C - 线程
我尝试了另一种方法 (creating threads) - 但是再次不确定如何获得此方法以最大限度地提高 CPU 使用率(理想情况下,我希望我的服务器使用 50%,不是吗?):
def doWork():
while True:
url,qid = q.get()
status, resp = getReq(url)
processReq(status, resp, qid)
q.task_done()
def getReq(url):
try:
resp = requests.get(url)
return resp.status_code, resp
except:
return 999, None
def processReq(status, resp, qid):
try:
json_geocode = resp.json()
# Found route between points
if status == 200:
tot_time_s = json_geocode['route_summary']['total_time']
tot_dist_m = json_geocode['route_summary']['total_distance']
used_from = json_geocode['via_points'][0]
used_to = json_geocode['via_points'][1]
out = [qid, status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
else:
print("Done but no route")
out = [qid, 999, 0, 0, 0, 0, 0, 0]
except Exception as err:
print("Error: %s" % err)
out = [qid, 999, 0, 0, 0, 0, 0, 0]
qres.put(out)
return
#Run:
concurrent = 1000
qres = Queue()
q = Queue(concurrent)
for i in range(concurrent):
t = Thread(target=doWork)
t.daemon = True
t.start()
try:
for url in url_routes:
q.put(url)
q.join()
except Exception:
pass
# Get results
calc_routes = [qres.get() for _ in range(len(url_routes))]
这个方法比我认为的 requests_futures 快,但我不知道要设置多少线程来最大化这个 -
D - 龙卷风(不工作)
我现在正在尝试龙卷风 - 但是如果我使用 curl,它无法完全让它工作,它会与现有代码 -1073741819 中断 - 如果我使用 simple_httpclient 它可以工作,但是我会收到超时错误:
错误:tornado.application:产量列表中的多个异常 Traceback (最近一次通话最后):文件 “C:\Anaconda3\lib\site-packages\tornado\gen.py”,第 789 行,在回调中 result_list.append(f.result()) 文件“C:\Anaconda3\lib\site-packages\tornado\concurrent.py”,第 232 行,在 结果 raise_exc_info(self._exc_info) File "", line 3, in raise_exc_info tornado.httpclient.HTTPError: HTTP 599: Timeout
def handle_req(r):
try:
json_geocode = json_decode(r)
status = int(json_geocode['status'])
tot_time_s = json_geocode['route_summary']['total_time']
tot_dist_m = json_geocode['route_summary']['total_distance']
used_from = json_geocode['via_points'][0]
used_to = json_geocode['via_points'][1]
out = [status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
print(out)
except Exception as err:
print(err)
out = [999, 0, 0, 0, 0, 0, 0]
return out
# Configure
# For some reason curl_httpclient crashes my computer
AsyncHTTPClient.configure("tornado.simple_httpclient.SimpleAsyncHTTPClient", max_clients=10)
@gen.coroutine
def run_experiment(urls):
http_client = AsyncHTTPClient()
responses = yield [http_client.fetch(url) for url, qid in urls]
responses_out = [handle_req(r.body) for r in responses]
raise gen.Return(value=responses_out)
# Initialise
_ioloop = ioloop.IOLoop.instance()
run_func = partial(run_experiment, url_routes)
calc_routes = _ioloop.run_sync(run_func)
E - asyncio / aiohttp
决定尝试使用 asyncio 和 aiohttp 的另一种方法(尽管让 tornado 工作会很棒)。
import asyncio
import aiohttp
def handle_req(data, qid):
json_geocode = json.loads(data.decode('utf-8'))
status = int(json_geocode['status'])
if status == 200:
tot_time_s = json_geocode['route_summary']['total_time']
tot_dist_m = json_geocode['route_summary']['total_distance']
used_from = json_geocode['via_points'][0]
used_to = json_geocode['via_points'][1]
out = [qid, status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
else:
print("Done, but not route for {0} - status: {1}".format(qid, status))
out = [qid, 999, 0, 0, 0, 0, 0, 0]
return out
def chunked_http_client(num_chunks):
# Use semaphore to limit number of requests
semaphore = asyncio.Semaphore(num_chunks)
@asyncio.coroutine
# Return co-routine that will download files asynchronously and respect
# locking fo semaphore
def http_get(url, qid):
nonlocal semaphore
with (yield from semaphore):
response = yield from aiohttp.request('GET', url)
body = yield from response.content.read()
yield from response.wait_for_close()
return body, qid
return http_get
def run_experiment(urls):
http_client = chunked_http_client(500)
# http_client returns futures
# save all the futures to a list
tasks = [http_client(url, qid) for url, qid in urls]
response = []
# wait for futures to be ready then iterate over them
for future in asyncio.as_completed(tasks):
data, qid = yield from future
try:
out = handle_req(data, qid)
except Exception as err:
print("Error for {0} - {1}".format(qid,err))
out = [qid, 999, 0, 0, 0, 0, 0, 0]
response.append(out)
return response
# Run:
loop = asyncio.get_event_loop()
calc_routes = loop.run_until_complete(run_experiment(url_routes))
这工作正常,但仍然比多处理慢!
【问题讨论】:
-
另一种方法是使用事件循环,而不是尝试使用最佳线程池大小。您可以使用回调注册请求,并在返回响应时等待事件循环处理
-
@dm03514 谢谢!但是,这不是我在做 requests-futures 示例时所拥有的吗?
future = session.get(url_in, background_callback=lambda sess, resp: ReqOsrm(sess, resp)) -
我从未使用过RequestFuture,但我认为它仍然委托给线程池,事件循环应该是一个新的请求模型,并且只会暴露一个线程,所以你不要'不必担心要配置多少线程来工作:) python 在 stdlibrary pypi.python.org/pypi/aiohttp 中有一个,我从未使用过,但看起来相对简单,tornado 是一个基于 os 事件库的框架,它具有简单的 api。 tornadokevinlee.readthedocs.org/en/latest/httpclient.html
-
@dm03514 我尝试使用 aiohttp,它做得相当好(比 requests-futures 更好)。但是,速度仍然比使用多处理要慢 - 除非我执行不正确并遇到瓶颈
-
@mptevsion 我正在尝试做同样的事情。你能告诉我当你说“......连接到在 8 个线程上启动并支持并行执行的本地服务器 (localhost,port:5005)”时你的意思吗?你如何在 8 个线程上午餐 OSRM 服务器?您如何支持并行执行?最后一个问题:你能给我一个 url_routes 的例子吗? qid 是什么?
标签: python multithreading asynchronous concurrency python-multiprocessing