【发布时间】:2021-09-25 13:46:33
【问题描述】:
目标
我想从网页中检索信息(并行)。一旦其中一个“爬虫”找到我们正在寻找的结果,我们就会终止,如果没有,我们刷新我们刚刚查看的页面并再次搜索。换一种说法:
- 在 3 个进程中打开网页(同一页面,延迟 X 秒)
- 一有结果就返回(每个进程不是一次全部)
- 如果这个结果
==True我们完成并终止池 - 如果没有,我们调用
.restart()并再次将其添加到池中 - 重复
代码端
爬取类
让我们首先定义Scraper 对象:
import random
import time
import multiprocessing
# Result simulation array, False is much more likely than True
RES = [True, False, False, False, False, False, False, False, False, False, False, False, False]
class Scrape:
def __init__(self, url):
self.url = url
self.result = None
def get_result(self):
return self.result
def scrape(self):
# Go to url
# simulate work
time.sleep(random.randrange(5))
# simulate result
result = RES[random.randrange(13)]
# Return what we found on the page
self.result = result
def restart(self):
# >> Some page refreshing
self.scrape()
所以我们去网页做一些工作(scrape)然后我们可以通过get_result访问结果,如果这不是我们想要的,我们可以调用restart。请注意,实际上这个类要复杂得多,因此重新创建它会浪费启动驱动程序(与通过restart 重用同一类相比)
并行代码
这是我卡住的地方,虽然我使用了数百次 map,但我不知道如何保留 Scrape 对象并调用 restart 并将它们再次添加到池中。我在想这样的事情,但这并不像我想要的那样工作。也许队列是一个更好的方法,但我不熟悉它。
# Function to create the scrapers
def obj_create(url):
print('Create')
a = Scrape(url)
a.scrape()
return a
# Function to restart the scraper
def obj_restart(a):
print('Restart')
a.restart()
a.scrape()
return a
# Callback
def call_back(scrape_obj):
if scrape_obj.get_result():
pool.terminate()
# Also somehow return the result...
else:
# Restart and add again
pool.apply_async(obj_restart, scrape_obj, callback=call_back)
pool = multiprocessing.Pool(3)
url = 'test.com'
delay = 0.001
n = 3
for i in range(n):
time.sleep(delay)
pool.apply_async(obj_create, url, callback=call_back)
pool.close()
pool.join()
我尽力制作这个可重现的例子,并尽我所能解释它,但如果有任何不清楚的地方,请告诉我!
【问题讨论】:
标签: python performance asynchronous parallel-processing