【问题标题】:concurrent.futures.ThreadPoolExecutor / Multithreading runs out of memory (Killed)concurrent.futures.ThreadPoolExecutor / 多线程内存不足(已终止)
【发布时间】:2021-11-20 16:14:07
【问题描述】:

我目前正在学习 python 的同时从事一个据说很简单的网络抓取项目。我有一个大约 70MB 的列表,其中包含我想要处理的几百万个 IP 地址 (sys.argv[1])。当然,并不是所有的都可以到达。

我正在尝试使用 concurrent.futures 并且目前遇到内存问题 - 最终导致整个进程被杀死。

现在,我按照here 的建议将我的未来分成两组(完成和未完成)。 我正在使用大约 100 个工作人员 (sys.argv[2]) 并且有 1GB 内存可用。

我虽然一旦用=> futures 1000 done 调用 future.results() 就会释放所有完成的期货?但是,它似乎只是在减慢进程(包括在进程被杀死之前填充内存)。

我在这里缺少什么?有关如何处理此问题的任何建议?

提前谢谢你。

我的代码如下:

import sys
import requests
import concurrent.futures
import urllib3
from lxml.html import fromstring
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

def title(host):
    try:
        url="https://"+host
        r = requests.get(url, headers={'User-Agent': 'Mozilla/5.0'}, timeout=3, verify=False)
        tree = fromstring(r.content.decode('utf-8'))
        title = tree.findtext('.//title')
        print(host+": "+title)
    except:
        pass

max=int(sys.argv[2])
with concurrent.futures.ThreadPoolExecutor(max_workers=max) as executor:
    futures_done = set()
    futures_notdone = set()
    with open(sys.argv[1]) as f:
        for line in f:
            host = line.strip()
            futures_notdone.add(executor.submit(title, host))
            if len(futures_notdone) >= max:
                done, futures_notdone = concurrent.futures.wait(futures_notdone, return_when=concurrent.futures.FIRST_COMPLETED)
                futures_done.update(done)
            for future in futures_done:
                if len(futures_done) >= 1000:
                    future.result()

【问题讨论】:

  • 编辑:线程似乎随着时间的推移而变慢。我让程序运行了几个小时。我会说函数(标题)以大约 20% 的初始启动速度运行。也许有些期货被卡住/没有被释放?

标签: python python-3.x multithreading python-multithreading concurrent.futures


【解决方案1】:

看起来您将完成的期货存储在一个集合中,而以后没有清除此列表,因此它可能会变得非常大。这可能是您的内存问题的原因。 future 的.release() 方法并没有释放它,它仍然在done_future 列表中被引用。

不完美,但您可以尝试以下方法。它最多调度max 作业同时执行。它定期收集已完成的工作并重新安排新工作。 idea 来自这个博客。

我在这种方法中看到的缺点是它必须定期轮询 max 计划的作业以找到已完成的作业,如果 max 值较大,这可能会很慢。

import sys
import requests
import concurrent.futures
import urllib3
from itertools import islice
from lxml.html import fromstring
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)


def title(host: str) -> str:
    try:
        url="https://"+host
        r = requests.get(url, headers={'User-Agent': 'Mozilla/5.0'}, timeout=3, verify=False)
        tree = fromstring(r.content.decode('utf-8'))
        title = tree.findtext('.//title')
        return host+": "+title
    except:
        pass

max = int(sys.argv[2])

with concurrent.futures.ThreadPoolExecutor(max_workers=max) as executor:
    with open(sys.argv[1]) as f:
        futures = {executor.submit(title, h) for h in islice(f, max)}
        
        while futures:
            done, futures = concurrent.futures.wait(
                futures, return_when=concurrent.futures.FIRST_COMPLETED)

            for future in done:
                print(future.result())

            for h in islice(f, len(done)):
                futures.add(executor.submit(title, h))

这是一个可能对您有用的解决方法,它在我的计算机上运行了超过 100 万次迭代,而没有使用超过 150 Mo。

它只是一个带有两个队列的自定义线程池,用于管理并发资源访问并限制最大并发。

import sys
from typing import Optional
import requests
import urllib3
from lxml.html import fromstring
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
from queue import Queue
from threading import Thread


def get_title(host: str) -> Optional[str]:
    try:
        url = f"https://{host}"
        r = requests.get(url, headers={'User-Agent': 'Mozilla/5.0'}, timeout=1, verify=False)
        tree = fromstring(r.content.decode('utf-8'))
        title = tree.findtext('.//title')
        return f"{host}: {title}"
    except Exception:
        return None

class Pool:
    def __init__(self, work, max_concurrent_jobs, max_worker: int = 32) -> None:
        self.max_workers = max_worker
        self.work_queue = Queue(max_concurrent_jobs)
        self.out_queue = Queue()
        self.is_running = True

        def _work():
            while self.is_running:
                item = self.work_queue.get()
                result = work(item)
                self.work_queue.task_done()
                self.out_queue.put(result)

        for _ in range(max_worker):
            Thread(target=_work).start()

    def close(self):
        self.is_running = False


if __name__ == "__main__":
    file_name = sys.argv[1]
    max = int(sys.argv[2])
    pool = Pool(work=get_title, max_concurrent_jobs=max)

    def worker():
        while True:
            item = pool.out_queue.get()
            if item is not None:
                print(item) # Or any follow-up job
            pool.out_queue.task_done()

    Thread(target=worker, daemon=True).start()

    with open(file_name) as f:
        for h in f:
            pool.work_queue.put(h.strip())

【讨论】:

猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2017-11-09
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多