【问题标题】:What is causing concurrent.futures deadlock? Code included [duplicate]是什么导致 concurrent.futures 死锁?包含代码[重复]
【发布时间】:2020-07-23 11:39:34
【问题描述】:

我有一个 concurrent.futures 抓取脚本,用于处理低级内容。然而,它开始发挥作用。它卡住了,永远不会结束。

我能够将问题缩小到 17 个 URL(从一批 18k,您可以想象那是多么有趣)。这 17 个 URL 中的一个或多个肯定发生了一些事情,这会导致停顿(死锁?),尽管我对请求和未来都使用了超时。奇怪的是,它似乎不是导致它的单个 URL。当我运行代码时,我会得到关于 url 已完成的日志。实际完成的 URL 批次似乎每次都在变化,因此似乎没有一个 URL 可以作为罪魁祸首。

欢迎任何帮助。

(按原样运行函数。不要使用 runBad = False,因为它需要一个元组列表。)

EDIT1: ProcessPoolExecutor 也会发生这种情况。

EDIT2:问题似乎与重试有关。 当我注释掉这三行并使用普通的requests.get 时,它会毫无问题地完成。但这是为什么呢?这可能是由于 Retry 的实现方式与 concurrent.futures 之间的兼容性问题吗?

#    s = requests.Session()
#    retries = Retry(total=1, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504], raise_on_status=False) # raise_on_status=False = místo RetryError vrátí response
#    s.mount("https://", HTTPAdapter(max_retries=retries))

EDIT3:即使是这个简单的请求也不起作用。所以它确实必须安装 HTTPAdapter / max_retries。甚至尝试了一个没有 urllib3 的Retry(),只使用max_retries=2。仍然没有工作。提出了一个问题,看看我们是否没有遗漏任何东西 - https://github.com/psf/requests/issues/5538

import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) # disabled SSL warnings
 
HEADERS = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.169 Safari/537.36'}
TIMEOUT = 5

s = requests.Session()
retries = Retry(total=3, backoff_factor=1, status_forcelist=[503])
s.mount("https://", HTTPAdapter(max_retries=retries))
response = s.get('https://employdentllc.com', headers=HEADERS, timeout=TIMEOUT, verify=False)

这是原始的 concurrent.futures 代码:

import requests
import concurrent.futures
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
from requests.exceptions import HTTPError
from requests.exceptions import SSLError
from requests.exceptions import ConnectionError
from requests.exceptions import Timeout
from requests.exceptions import TooManyRedirects
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) # disabled SSL warnings

HEADERS = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.169 Safari/537.36'}
TIMEOUT = 5

def getMultiRequest(url, runBad, bad_request, tTimeout):
    #print("url = ", url)
    s = requests.Session()
    retries = Retry(total=3, backoff_factor=5, status_forcelist=[429, 500, 502, 503, 504], raise_on_status=False) # raise_on_status=False = instead of RetryError returns a response
    s.mount("https://", HTTPAdapter(max_retries=retries))
    if runBad == False:
        try:
            response = s.get(url, headers=HEADERS, timeout=tTimeout, verify=False)
           
                                            # Processing stuff // some can be pretty long (Levenstein etc)
               
            ret = (url, response.url, response.status_code, "", len(response.content), "", "", "")
        except HTTPError as e:
            ret = (url, "", e.response.status_code, "", 0, "", "", False)
        except SSLError:
            ret = (url, "", 0, "SSL certificate verification failed", 0, "", "", False)
        except ConnectionError:
            ret = (url, "", 0, "Cannot establish connection", 0, "", "", False)
        except Timeout:
            ret = (url, "", 0, "Request timed out", 0, "", "", False)
        except TooManyRedirects:
            ret = (url, "", 0, "Too many redirects", 0, "", "", False)
        except Exception:
            ret = (url, "", 0, "Undefined exception", 0, "", "", False)
        return ret
    else:
        try:
            response = s.get(url, headers=HEADERS, timeout=tTimeout, verify=False)
           
                                            # Processing stuff // some can be pretty long (Levenstein etc)
               
            ret = (url, response.url, response.status_code, "", "")
        except Exception:
            ret = (url, "", 0, "", "")
        return ret

def getMultiRequestThreaded(urlList, runBad, logURLs, tOut):
    responseList = []
    if runBad == True:
        with concurrent.futures.ThreadPoolExecutor() as executor:
            future_to_url = {executor.submit(getMultiRequest, url, runBad, "", tOut): url for url in urlList}
            for future in concurrent.futures.as_completed(future_to_url):
                url = future_to_url[future]
                try:
                    data = future.result(timeout=30)
                except Exception as exc:
                    data = (url, 0, str(type(exc)))
                finally:
                    if logURLs == True:
                        print("BAD URL done: '" + url + "'.")
                    responseList.append(data)
    else:
        with concurrent.futures.ThreadPoolExecutor() as executor:
            future_to_url = {executor.submit(getMultiRequest, url[0], runBad, url[1], tOut): url for url in urlList}
            for future in concurrent.futures.as_completed(future_to_url):
                url = future_to_url[future][0]
                try:
                    data = future.result(timeout=30)
                except Exception as exc:
                    data = (url, 0, str(type(exc)))
                finally:
                    if logURLs == True:
                        print("LEGIT URL done: '" + url + "'.")
                    responseList.append(data)
    return responseList

URLs = [
    'https://www.appyhere.com/en-us',
    'https://jobilant.work/da',
    'https://www.iworkjobsite.com.au/jobseeker-home.htm',
    'https://youtrust.jp/lp',
    'https://passioneurs.net/ar',
    'https://employdentllc.com',
    'https://www.ivvajob.com/default/index',
    'https://praceapp.com/en',
    'https://www.safecook.be/en/home-en',
    'https://www.ns3a.com/en',
    'https://www.andjaro.com/en/home',
    'https://sweatcoin.club/',
    'https://www.pursuitae.com',
    'https://www.jobpal.ai/en',
    'https://www.clinicoin.io/en',
    'https://www.tamrecruiting.com/applicant-tracking-system-software-recruitment-management-system-talent-management-software-from-the-applicant-manager',
    'https://dott.one/index.html'
]

output = getMultiRequestThreaded(URLs, True, True, TIMEOUT)

【问题讨论】:

  • 如果使用ProcessPoolExecutor,是否也会出现死锁?
  • @AKX 不适用于此批次,不。但是,我的印象是不希望使用ProcessPoolExecutor 来处理请求。但我会对其进行一些测试。
  • 我不明白为什么在这里使用进程池会有任何显着差异。线程池,因为您明确共享请求会话。 (共享一个将有助于连接池。)
  • @AKX 说得太早了:<class 'concurrent.futures.process.BrokenProcessPool'>
  • @AKX - 能够使用if __name__ == '__main__ 解决上述问题,但即使使用ProcessPoolExecutor 仍然卡住

标签: python python-requests concurrent.futures


【解决方案1】:

我修改了程序以将所有 URL 添加到一个集合中,并且在循环 for future in concurrent.futures.as_completed(future_to_url): 中完成每个 URL 的获取(无论好坏),我从集合中删除了 URL 并打印出当前集合内容.这样,当程序最终挂起时,我就会知道还有什么需要完成:总是 URL https://employdentllc.comhttps://www.pursuitae.com

当我尝试自行获取这些 URL 时,它们均返回 503 Service Unavailable 错误。所以当我注释掉以下两行时,程序运行完成。

retries = Retry(total=3, backoff_factor=5, status_forcelist=[429, 500, 502, 503, 504], raise_on_status=False) # raise_on_status=False = instead of RetryError returns a response
s.mount("https://", HTTPAdapter(max_retries=retries))

仅从列表中删除代码 503 无济于事。此规范中要么有其他问题(尽管它看起来是正确的,而不是相当大的 backoff_factor,我减少了它只是为了确保我等待的时间足够长)或者 requestsurllib3 有问题。

下面是变量output中每个结果的打印输出:

('https://www.appyhere.com/en-us', 'https://www.appyhere.com/en-us', 200, '', '')
('https://www.iworkjobsite.com.au/jobseeker-home.htm', 'https://www.iworkjobsite.com.au/jobseeker-home.htm', 200, '', '')
('https://passioneurs.net/ar', 'https://passioneurs.net/ar', 404, '', '')
('https://youtrust.jp/lp', 'https://youtrust.jp/lp', 200, '', '')
('https://jobilant.work/da', 'https://jobilant.work/da/', 200, '', '')
('https://employdentllc.com', 'https://employdentllc.com/', 503, '', '')
('https://www.ivvajob.com/default/index', 'https://www.ivvajob.com/default/index', 200, '', '')
('https://www.ns3a.com/en', 'https://www.ns3a.com/en', 200, '', '')
('https://www.safecook.be/en/home-en', 'https://www.safecook.be/en/home-en/', 200, '', '')
('https://sweatcoin.club/', 'https://sweatcoin.club/', 200, '', '')
('https://www.andjaro.com/en/home', 'https://www.andjaro.com/en/home/', 200, '', '')
('https://praceapp.com/en', 'https://praceapp.com/en/', 200, '', '')
('https://www.clinicoin.io/en', 'https://www.clinicoin.io/en', 200, '', '')
('https://www.jobpal.ai/en', 'https://www.jobpal.ai/en/', 200, '', '')
('https://dott.one/index.html', 'https://dott.one/index.html', 200, '', '')
('https://www.tamrecruiting.com/applicant-tracking-system-software-recruitment-management-system-talent-management-software-from-the-applicant-manager', 'https://www.tamrecruiting.com/applicant-tracking-system-software-recruitment-management-system-talent-management-software-from-the-applicant-manager', 404, '', '')
('https://www.pursuitae.com', 'https://www.pursuitae.com/', 503, '', '')

更新

我发现了问题。您需要respect_retry_after_header=False 参数:

retries = Retry(total=3, backoff_factor=5, status_forcelist=[429, 500, 502, 503, 504], raise_on_status=False, respect_retry_after_header=False) # raise_on_status=False = instead of RetryError returns a response

您可能还希望将 backoff_factor 减少到 1。

这现在似乎与 Retry for python requests module hanging 重复。

【讨论】:

  • 谢谢,我实际上为此提出了一个问题,看看我们是否没有遗漏任何东西:github.com/psf/requests/issues/5538
  • 查看我的更新答案。
  • 谢谢老兄,你真棒!点赞!
【解决方案2】:

我能够重现死锁,但我不确定它为什么会发生,但是使用 multiprocessing.ThreadPool()不会发生

from multiprocessing.pool import ThreadPool
import requests
import urllib3

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

HEADERS = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.169 Safari/537.36"
}

sess = requests.Session()


def do_request(job):
    (sess, url, timeout) = job  # no `istarmap_unordered`...
    try:
        response = sess.get(url, headers=HEADERS, timeout=timeout, verify=False)
        return (url, response.url, response.status_code, "", "")
    except Exception as exc:
        return (url, "", 0, str(exc), "")


def get_responses_threaded(url_list):
    with ThreadPool() as p, requests.Session() as sess:
        jobs = [(sess, url, 5) for url in url_list]  # no `istarmap_unordered`...
        yield from p.imap_unordered(do_request, jobs)


urls = [
    # ...
]

for resp in get_responses_threaded(urls):
    print(resp)

【讨论】:

  • 你实际上改变了两件事,即池化方法和错误恢复,现在不存在了。 concurrent.futures 没有任何问题,而是尝试对几个有问题的 URL 进行重试的方式有问题。看我的回答。
猜你喜欢
  • 1970-01-01
  • 2012-08-27
  • 1970-01-01
  • 1970-01-01
  • 2020-08-20
  • 1970-01-01
  • 2017-11-16
  • 1970-01-01
  • 2021-05-09
相关资源
最近更新 更多