【发布时间】:2020-07-23 11:39:34
【问题描述】:
我有一个 concurrent.futures 抓取脚本,用于处理低级内容。然而,它开始发挥作用。它卡住了,永远不会结束。
我能够将问题缩小到 17 个 URL(从一批 18k,您可以想象那是多么有趣)。这 17 个 URL 中的一个或多个肯定发生了一些事情,这会导致停顿(死锁?),尽管我对请求和未来都使用了超时。奇怪的是,它似乎不是导致它的单个 URL。当我运行代码时,我会得到关于 url 已完成的日志。实际完成的 URL 批次似乎每次都在变化,因此似乎没有一个 URL 可以作为罪魁祸首。
欢迎任何帮助。
(按原样运行函数。不要使用 runBad = False,因为它需要一个元组列表。)
EDIT1: ProcessPoolExecutor 也会发生这种情况。
EDIT2:问题似乎与重试有关。
当我注释掉这三行并使用普通的requests.get 时,它会毫无问题地完成。但这是为什么呢?这可能是由于 Retry 的实现方式与 concurrent.futures 之间的兼容性问题吗?
# s = requests.Session()
# retries = Retry(total=1, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504], raise_on_status=False) # raise_on_status=False = místo RetryError vrátí response
# s.mount("https://", HTTPAdapter(max_retries=retries))
EDIT3:即使是这个简单的请求也不起作用。所以它确实必须安装 HTTPAdapter / max_retries。甚至尝试了一个没有 urllib3 的Retry(),只使用max_retries=2。仍然没有工作。提出了一个问题,看看我们是否没有遗漏任何东西 - https://github.com/psf/requests/issues/5538:
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) # disabled SSL warnings
HEADERS = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.169 Safari/537.36'}
TIMEOUT = 5
s = requests.Session()
retries = Retry(total=3, backoff_factor=1, status_forcelist=[503])
s.mount("https://", HTTPAdapter(max_retries=retries))
response = s.get('https://employdentllc.com', headers=HEADERS, timeout=TIMEOUT, verify=False)
这是原始的 concurrent.futures 代码:
import requests
import concurrent.futures
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
from requests.exceptions import HTTPError
from requests.exceptions import SSLError
from requests.exceptions import ConnectionError
from requests.exceptions import Timeout
from requests.exceptions import TooManyRedirects
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) # disabled SSL warnings
HEADERS = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.169 Safari/537.36'}
TIMEOUT = 5
def getMultiRequest(url, runBad, bad_request, tTimeout):
#print("url = ", url)
s = requests.Session()
retries = Retry(total=3, backoff_factor=5, status_forcelist=[429, 500, 502, 503, 504], raise_on_status=False) # raise_on_status=False = instead of RetryError returns a response
s.mount("https://", HTTPAdapter(max_retries=retries))
if runBad == False:
try:
response = s.get(url, headers=HEADERS, timeout=tTimeout, verify=False)
# Processing stuff // some can be pretty long (Levenstein etc)
ret = (url, response.url, response.status_code, "", len(response.content), "", "", "")
except HTTPError as e:
ret = (url, "", e.response.status_code, "", 0, "", "", False)
except SSLError:
ret = (url, "", 0, "SSL certificate verification failed", 0, "", "", False)
except ConnectionError:
ret = (url, "", 0, "Cannot establish connection", 0, "", "", False)
except Timeout:
ret = (url, "", 0, "Request timed out", 0, "", "", False)
except TooManyRedirects:
ret = (url, "", 0, "Too many redirects", 0, "", "", False)
except Exception:
ret = (url, "", 0, "Undefined exception", 0, "", "", False)
return ret
else:
try:
response = s.get(url, headers=HEADERS, timeout=tTimeout, verify=False)
# Processing stuff // some can be pretty long (Levenstein etc)
ret = (url, response.url, response.status_code, "", "")
except Exception:
ret = (url, "", 0, "", "")
return ret
def getMultiRequestThreaded(urlList, runBad, logURLs, tOut):
responseList = []
if runBad == True:
with concurrent.futures.ThreadPoolExecutor() as executor:
future_to_url = {executor.submit(getMultiRequest, url, runBad, "", tOut): url for url in urlList}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result(timeout=30)
except Exception as exc:
data = (url, 0, str(type(exc)))
finally:
if logURLs == True:
print("BAD URL done: '" + url + "'.")
responseList.append(data)
else:
with concurrent.futures.ThreadPoolExecutor() as executor:
future_to_url = {executor.submit(getMultiRequest, url[0], runBad, url[1], tOut): url for url in urlList}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future][0]
try:
data = future.result(timeout=30)
except Exception as exc:
data = (url, 0, str(type(exc)))
finally:
if logURLs == True:
print("LEGIT URL done: '" + url + "'.")
responseList.append(data)
return responseList
URLs = [
'https://www.appyhere.com/en-us',
'https://jobilant.work/da',
'https://www.iworkjobsite.com.au/jobseeker-home.htm',
'https://youtrust.jp/lp',
'https://passioneurs.net/ar',
'https://employdentllc.com',
'https://www.ivvajob.com/default/index',
'https://praceapp.com/en',
'https://www.safecook.be/en/home-en',
'https://www.ns3a.com/en',
'https://www.andjaro.com/en/home',
'https://sweatcoin.club/',
'https://www.pursuitae.com',
'https://www.jobpal.ai/en',
'https://www.clinicoin.io/en',
'https://www.tamrecruiting.com/applicant-tracking-system-software-recruitment-management-system-talent-management-software-from-the-applicant-manager',
'https://dott.one/index.html'
]
output = getMultiRequestThreaded(URLs, True, True, TIMEOUT)
【问题讨论】:
-
如果使用
ProcessPoolExecutor,是否也会出现死锁? -
@AKX 不适用于此批次,不。但是,我的印象是不希望使用
ProcessPoolExecutor来处理请求。但我会对其进行一些测试。 -
我不明白为什么在这里使用进程池会有任何显着差异。线程池,因为您明确不共享请求会话。 (共享一个将有助于连接池。)
-
@AKX 说得太早了:
<class 'concurrent.futures.process.BrokenProcessPool'> -
@AKX - 能够使用
if __name__ == '__main__解决上述问题,但即使使用ProcessPoolExecutor仍然卡住
标签: python python-requests concurrent.futures