【问题标题】:Use objects in parallel processing and "restart" these在并行处理中使用对象并“重新启动”这些对象
【发布时间】:2021-09-25 13:46:33
【问题描述】:

目标

我想从网页中检索信息(并行)。一旦其中一个“爬虫”找到我们正在寻找的结果,我们就会终止,如果没有,我们刷新我们刚刚查看的页面并再次搜索。换一种说法:

  • 在 3 个进程中打开网页(同一页面,延迟 X 秒)
  • 一有结果就返回(每个进程不是一次全部)
  • 如果这个结果 ==True 我们完成并终止池
  • 如果没有,我们调用.restart() 并再次将其添加到池中
  • 重复

代码端

爬取类

让我们首先定义Scraper 对象:

import random
import time
import multiprocessing

# Result simulation array, False is much more likely than True
RES = [True, False, False, False, False, False, False, False, False, False, False, False, False]

class Scrape:

    def __init__(self, url):
        self.url = url
        self.result = None

    def get_result(self):
        return self.result

    def scrape(self):
        # Go to url
        # simulate work
        time.sleep(random.randrange(5))
        # simulate result
        result = RES[random.randrange(13)]
        # Return what we found on the page
        self.result = result

    def restart(self):
        # >> Some page refreshing
        self.scrape()

所以我们去网页做一些工作(scrape)然后我们可以通过get_result访问结果,如果这不是我们想要的,我们可以调用restart。请注意,实际上这个类要复杂得多,因此重新创建它会浪费启动驱动程序(与通过restart 重用同一类相比)

并行代码

这是我卡住的地方,虽然我使用了数百次 map,但我不知道如何保留 Scrape 对象并调用 restart 并将它们再次添加到池中。我在想这样的事情,但这并不像我想要的那样工作。也许队列是一个更好的方法,但我不熟悉它。

# Function to create the scrapers
def obj_create(url):
    print('Create')
    a = Scrape(url)
    a.scrape()
    return a

# Function to restart the scraper
def obj_restart(a):
    print('Restart')
    a.restart()
    a.scrape()
    return a

# Callback
def call_back(scrape_obj):
    if scrape_obj.get_result():
        pool.terminate()
        # Also somehow return the result...
    else:
        # Restart and add again
        pool.apply_async(obj_restart, scrape_obj, callback=call_back)


pool = multiprocessing.Pool(3)
url = 'test.com'
delay = 0.001
n = 3

for i in range(n):
    time.sleep(delay)
    pool.apply_async(obj_create, url, callback=call_back)

pool.close()
pool.join()

我尽力制作这个可重现的例子,并尽我所能解释它,但如果有任何不清楚的地方,请告诉我!

【问题讨论】:

    标签: python performance asynchronous parallel-processing


    【解决方案1】:

    蟒蛇

    并行

    您似乎从未听说过GIL?如果是这样,很抱歉让您失望了,但您无法获得真正的并行性。

    不止于此。在您描述的情况下..您不需要一个。因为大多数时候你的线程会被网络接口阻塞来回复。因此,您需要一个好的异步框架,即事件循环,它将优化您的执行,从而将阻塞减少到可能的最低限度。

    让我介绍aiohttp。它为您提供了正确实现的 HTTP(s) 调用,而光荣的 asyncio 为您提供了取消、重试和处理错误的简洁机制。

    特别是看看gather

    附: multiprocessing 将使事情并行化,但由于任务的 IO 绑定性质,这里有点过分了。您可以留在同一个进程中,并通过拒绝这种并行性来避免所有多处理地狱,这两种方式在大多数情况下都会被阻止。

    【讨论】:

    • 谢谢!,但是我对此并不完全确定,我的“抓取”不仅仅是像 aiohttp 用于的 http 请求(据我所知),而是涉及 google chrome 配置文件、chrome扩展、点击框等,都通过 selenium。那么这仍然适用吗?
    • 是的。它会。 aiohttp 并没有规定您如何处理其余问题。它只是为您提供了一种正确处理 IO 的方法,在您使用网络的情况下,让您保持在同一个 RAM 中(因此,使整个应用程序变得更加简单)。
    • 你能写出/你有类似的代码示例吗?我看到它每个线程都阻塞,但每个线程都有自己的驱动程序,如果我不想使用不同的驱动程序,我想我需要从 selenium 切换(参见stackoverflow.com/questions/50303797/…
    • @CodeNoob 不,我不确定 Selenium,我猜有些异步实现“可以正常工作”,但还没有尝试过。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-06-12
    • 2016-11-27
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多