【问题标题】:Unable to modify a function to stick to one working proxy无法修改功能以坚持一个工作代理
【发布时间】:2019-12-14 12:18:49
【问题描述】:

我用 Python 编写了一个脚本,它使用代理和多处理,同时向某些链接发送请求,以便从那里解析产品名称。我当前的尝试错误地完成了这项工作,但它通过在每次调用中尝试使用三个新代理来减慢进程,而不管正在运行的代理是好是坏。

由于我使用了多处理,如脚本中的multiprocessing.dummy,我希望以这样的方式修改parse_product_info() 函数,以便即使代理被识别为坏,它也不会多次调用process_proxy() 函数生成三个新的代理。更清楚一点 - 在我当前尝试运行的代理是好是坏时,我可以看到当在 parse_product_info(link) 中使用链接时,三个新代理在每次调用中发挥作用,因为我使用了 3Pool()内。

我试过了:

import random
import requests
from bs4 import BeautifulSoup
from multiprocessing.dummy import Pool

linklist = [
    'https://www.amazon.com/dp/B00OI0RGGO', 
    'https://www.amazon.com/dp/B00TPKOPWA', 
    'https://www.amazon.com/dp/B00TH42HWE', 
    'https://www.amazon.com/dp/B00TPKNREM', 
]

def process_proxy():
    global proxyVault
    if len(proxyVault)!=0:
        random.shuffle(proxyVault)
        proxy_url = proxyVault.pop()
        proxy = {'https': f'http://{proxy_url}'}
    else:
        proxy = None
    return proxy


def parse_product_info(link):
    global proxy
    try:
        if not proxy:raise #if proxy variable doesn't contain any proxy yet, it goes to the exception block to get one as long as the proxy list is not empty
        print("proxy to be used:",proxy)
        res = requests.get(link,proxies=proxy,timeout=5)
        soup = BeautifulSoup(res.text,"html5lib")
        try:
            product_name = soup.select_one("#productTitle").get_text(strip=True)
        except Exception: product_name = ""
        print(link,product_name)

    except Exception:
        proxy = process_proxy()
        if proxy!=None:
            return parse_product_info(link)
        else:
            pass


if __name__ == '__main__':
    proxyVault = ['103.110.37.244:36022', '180.254.218.229:8080', '110.74.197.207:50632', '1.20.101.95:49001', '200.10.193.90:8080', '173.164.26.117:3128', '103.228.118.66:43002', '178.128.231.201:3128', '1.2.169.54:55312', '181.52.85.249:31487', '97.64.135.4:8080', '190.96.214.123:53251', '52.144.107.142:31923', '45.5.224.145:52035', '89.218.22.178:8080', '192.241.143.186:80', '113.53.29.218:38310', '36.78.131.182:39243']
    pool = Pool(3)
    pool.map(parse_product_info,linklist)

我如何修改parse_product_info() 函数,以便它在工作时会坚持一个代理?

【问题讨论】:

  • 请注意,multiprocessing.dummy 确实使用多个进程,仅使用线程。真正的多处理行为不同——具体而言,global 数据不共享。
  • 您能否命名或建议任何这样的库,它的行为与多处理应该如何做完全一样?原谅我的无知@MisterMiyagi。谢谢。
  • 在这种情况下,我相信您可以从文件中读取/写入proxy
  • @robots.txt 普通的multiprocessing 模块确实使用了正确的进程。 multiprocessing.dummy 只是在线程之上复制相同的 API。

标签: python multithreading web-scraping proxy multiprocessing


【解决方案1】:

首先,尽管使用了multiprocessing 模块,但您在这里使用的是多线程,因为.dummy 使用线程而不是进程。

我最初认为 OP 可以很好地处理多线程,因为在示例中没有迹象表明繁重的 cpu 绑定工作,但由于我们现在知道 OP 确实可能想要使用多处理,所以我还提供了一个多处理解决方案。

OP 的示例需要对整个代理处理的同步进行返工。我通过“模拟”请求部分并删除汤汁部分来简化示例,因为这对问题并不重要。


多处理

此解决方案使用multiprocessing.Value 作为共享计数器来索引代理列表。如果工作人员超时,它会增加共享索引。在Pool'sinitializer-参数的帮助下,共享计数器和代理列表在(工作)进程启动时注册一次。

对非静态共享资源的任何非原子操作使用锁很重要。 multiprocessing.Value 默认有一个 multiprocessing.RLock 附加我们可以使用。

import time
import random
import logging
from multiprocessing import Pool, Value, get_logger, log_to_stderr


def request_get(link, proxies, timeout):
    """Dummy request.get()"""
    res = random.choices(["Result", "Timeout"], [0.5, 0.5])
    if res[0] == "Result":
        time.sleep(random.uniform(0, timeout))
        return f"{res[0]} from {link}"
    else:
        time.sleep(timeout)
        raise TimeoutError


def parse_product_info(link):
    global proxy_list, proxy_index    
    while True:
        with proxy_index.get_lock():
            idx = proxy_index.value
        try:
            proxy = {'https': proxy_list[idx]}
        except IndexError:
            # get_logger().info(f"No proxies left.")
            return    
        try:
            # get_logger().info(f"attempt using: {proxy}")
            res = request_get(link, proxies=proxy, timeout=5)
        except TimeoutError:
            # get_logger().info(f"timeout with: {proxy}")
            with proxy_index.get_lock():
                # check with lock held if index is still the same
                if idx == proxy_index.value:
                    proxy_index.value += 1
                    # get_logger().info(f"incremented index: {proxy_index.value}")
        else:
            # get_logger().info(f"processing: {res}")
            return    


def _init_globals(proxy_list, proxy_index):
    globals().update(
        {'proxy_list': proxy_list, 'proxy_index': proxy_index}
    )

主要:

if __name__ == '__main__':

    log_to_stderr(logging.INFO)

    links = [
        'https://www.amazon.com/dp/B00OI0RGGO',
        'https://www.amazon.com/dp/B00TPKOPWA',
        'https://www.amazon.com/dp/B00TH42HWE',
        'https://www.amazon.com/dp/B00TPKNREM',
    ]

    proxies = [
        '103.110.37.244:36022', '180.254.218.229:8080', '110.74.197.207:50632',
        '1.20.101.95:49001', '200.10.193.90:8080', '173.164.26.117:3128',
        '103.228.118.66:43002', '178.128.231.201:3128', '1.2.169.54:55312',
        '181.52.85.249:31487', '97.64.135.4:8080', '190.96.214.123:53251',
        '52.144.107.142:31923', '45.5.224.145:52035', '89.218.22.178:8080',
        '192.241.143.186:80', '113.53.29.218:38310', '36.78.131.182:39243'
    ]
    proxies = [f"http://{proxy}" for proxy in proxies]
    proxy_index = Value('i', 0)

    with Pool(
            processes=3,
            initializer=_init_globals,
            initargs=(proxies, proxy_index)
    ) as pool:

        pool.map(parse_product_info, links)

示例输出:

[INFO/MainProcess] allocating a new mmap of length 4096
[INFO/ForkPoolWorker-1] child process calling self.run()
...
[INFO/ForkPoolWorker-1] attempt using: {'https': 'http://103.110.37.244:36022'}
[INFO/ForkPoolWorker-2] attempt using: {'https': 'http://103.110.37.244:36022'}
[INFO/ForkPoolWorker-3] attempt using: {'https': 'http://103.110.37.244:36022'}
[INFO/ForkPoolWorker-2] processing: Result from https://www.amazon.com/dp/B00TPKOPWA
[INFO/ForkPoolWorker-2] attempt using: {'https': 'http://103.110.37.244:36022'}
[INFO/ForkPoolWorker-3] timeout with: {'https': 'http://103.110.37.244:36022'}
[INFO/ForkPoolWorker-3] incremented index: 1
[INFO/ForkPoolWorker-3] attempt using: {'https': 'http://180.254.218.229:8080'}
[INFO/ForkPoolWorker-1] timeout with: {'https': 'http://103.110.37.244:36022'}
[INFO/ForkPoolWorker-1] attempt using: {'https': 'http://180.254.218.229:8080'}
[INFO/ForkPoolWorker-3] processing: Result from https://www.amazon.com/dp/B00TH42HWE
[INFO/ForkPoolWorker-2] processing: Result from https://www.amazon.com/dp/B00TPKNREM
[INFO/ForkPoolWorker-1] processing: Result from https://www.amazon.com/dp/B00OI0RGGO
[INFO/ForkPoolWorker-3] process shutting down
[INFO/ForkPoolWorker-2] process shutting down
...

Process finished with exit code 0

多线程

下面的提议在threading.Lock 的帮助下同步代理处理(也可以包装为multiprocessing.dummy.Lock),这是可能的,因为multiprocessing.dummy 仅使用线程。

请注意,multiprocessing.Lock(不是来自.dummy)相比之下是一个沉重(相对较慢)的 IPC 锁定,它会根据您同步的频率对整体性能产生影响。

编辑:

多线程解决方案已从早期草案中重构,以从上述多处理解决方案中汲取设计。 parse_product_info() 现在对于多线程/多处理几乎相同。

import time
import random
import logging
from itertools import repeat
from multiprocessing.dummy import Pool, Lock
get_logger = logging.getLogger


def request_get(link, proxies, timeout):
    ... # same as in multiprocessing solution above


def parse_product_info(link):
    global proxies, proxy_index
    while True:
        with proxy_lock:
            idx_proxy = proxy_index
        try:
            proxy = {'https': proxies[idx_proxy]}
        except IndexError:
            # get_logger().info(f"No proxies left.")
            return
        try:
            # get_logger().info(f"attempt using: {proxy}")
            res = request_get(link, proxies=proxy, timeout=5)
        except TimeoutError:
            # get_logger().info(f"timeout with: {proxy}")
            with proxy_lock:
                if idx_proxy == proxy_index:
                    proxy_index += 1
                    # get_logger().info(f"incremented index:{proxy_index}")
        else:
            # get_logger().info(f"processing: {res}")
            return    


def init_logging(level=logging.INFO):
    fmt = '[%(asctime)s %(threadName)s] --- %(message)s'
    logging.basicConfig(format=fmt, level=level)
    return logging.getLogger()

主要:

if __name__ == '__main__':

    init_logging()

    linklist = ... # same as in multiprocessing solution above    
    proxies = ... # same as in multiprocessing solution above
    proxy_index = 0
    proxy_lock = Lock()

    with Pool(processes=3) as pool:
        pool.map(parse_product_info, links)

示例输出:

[2019-12-18 01:40:25,799 Thread-1] --- attempt using: {'https': 'http://103.110.37.244:36022'}
[2019-12-18 01:40:25,799 Thread-2] --- attempt using: {'https': 'http://103.110.37.244:36022'}
[2019-12-18 01:40:25,799 Thread-3] --- attempt using: {'https': 'http://103.110.37.244:36022'}
[2019-12-18 01:40:26,164 Thread-1] --- processing: Result from https://www.amazon.com/dp/B00OI0RGGO
[2019-12-18 01:40:26,164 Thread-1] --- attempt using: {'https': 'http://103.110.37.244:36022'}
[2019-12-18 01:40:29,568 Thread-1] --- processing: Result from https://www.amazon.com/dp/B00TPKNREM
[2019-12-18 01:40:30,800 Thread-2] --- timeout with: {'https': 'http://103.110.37.244:36022'}
[2019-12-18 01:40:30,800 Thread-2] --- incremented index: 1
[2019-12-18 01:40:30,800 Thread-2] --- attempt using: {'https': 'http://180.254.218.229:8080'}
[2019-12-18 01:40:30,800 Thread-3] --- timeout with: {'https': 'http://103.110.37.244:36022'}
[2019-12-18 01:40:30,801 Thread-3] --- attempt using: {'https': 'http://180.254.218.229:8080'}
[2019-12-18 01:40:32,941 Thread-3] --- processing: Result from https://www.amazon.com/dp/B00TH42HWE
[2019-12-18 01:40:34,677 Thread-2] --- processing: Result from https://www.amazon.com/dp/B00TPKOPWA

Process finished with exit code 0

回复 OP 的最新评论:

如果您愿意,您可以在使用完所有代理后交换 IndexError 异常处理程序块中的代理列表。在代码中您将return 交换为:

        with proxy_lock:
            proxies = new_proxies
            proxy_index = 0
        continue

【讨论】:

  • 它似乎以正确的方式工作。但是,对于我当前的用例来说,它有点沉重。
  • @robots.txt 你说的重是什么意思?顺便说一句,如果您真的需要它,我还将发布一个多处理解决方案。
  • 我试过你的@Darkonaut。这真是太优雅了!!!!非常感谢。
  • 是否可以使用proxies.pop() 而不是proxies[idx_proxy] 保持您在Multithreading 方法@Darkonaut 中显示的现有逻辑完整?我希望选择proxies.pop() 的原因是因为我将(以备将来使用)跟踪代理列表以确保列表是否为空,以便我可以在脚本运行时更新列表。其实this是我平时的做法。谢谢
  • @robots.txt 所以你从一开始就没有有限数量的代理?当您没有代理时,您无需切换到.pop() 来更新您的列表。您可以在获得 IndexError 的 exeption-handler 块中按照您的意愿更新代理列表,因为您到达了列表的末尾。代码见上次编辑。
【解决方案2】:

您遇到的问题是竞争条件 - 多个进程(线程因为您使用 multiprocessing.dummy)看到 proxy 未初始化并尝试获取新的 proxy

您可以使用 Python 的 multiprocessing.Lock 实现 get_proxy 函数:

from multiprocessing import Lock

lock = Lock()

def get_proxy():
    global proxy
    try:
        # If proxy already set, then no need to wait for lock
        if proxy:
            return proxy
    except NameError:
        pass
    with lock:
        try:
            # If proxy was set when waiting for lock, then don't process another
            if proxy:
                return proxy
        except NameError:
            pass
        proxy = process_proxy()
        return proxy
    return None

clear_proxy 函数一起使用:

def clear_proxy(non_working_proxy):
    global proxy
    if non_working_proxy is None:
        return
    with lock:
        if proxy == non_working_proxy:
            proxy = None

然后在parse_product_info函数中调用它们而不是process_proxy

def parse_product_info(link):
    # global proxy       # Remove this
    proxy = get_proxy()  # Add this
    try:
        ...

    except Exception:
        # proxy = process_proxy()  # Remove this
        clear_proxy(proxy)         # Add this to clear a non-working proxy
        proxy = get_proxy()        # Add this to queue up to get new proxy
        if proxy!=None:
            return parse_product_info(link)
        else:
            pass

关于multiprocessing.dummy的说明

MisterMiyagi 在对该问题的评论中提到:

注意multiprocessing.dummy 确实使用多个进程,只是线程。真正的多处理行为不同——具体而言,global 数据不共享。

在这种情况下,我相信您可以从with lock 块中的文件读取/写入proxy

【讨论】:

    猜你喜欢
    • 2014-03-24
    • 1970-01-01
    • 1970-01-01
    • 2013-11-03
    • 2012-10-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多