【问题标题】:Fetching data with Python's asyncio in a sequential order使用 Python 的 asyncio 按顺序获取数据
【发布时间】:2014-06-16 15:05:33
【问题描述】:

我有一个 Python 2.7 程序,它从网站提取数据并将结果转储到数据库中。它遵循消费者生产者模型,使用线程模块编写。

只是为了好玩,我想使用新的 asyncio 模块(从 3.4 开始)重写这个程序,但我不知道如何正确地做到这一点。

最关键的要求是程序必须按顺序从同一网站获取数据。例如对于一个 url 'http://a-restaurant.com' 它应该首先得到 'http://a-restaurant.com/menu/0',然后是 'http://a-restaurant.com/menu/1', 然后 'http://a-restaurant.com/menu/2', ... 如果没有按顺序获取它们,则网站完全停止交付页面,您必须从 0 开始。

但是另一个网站的另一个抓取 ('http://another-restaurant.com') 可以(并且应该)同时运行(其他网站也有连续限制)。

线程模块非常适合这个,因为我可以为每个网站创建单独的线程,并且在每个线程中它可以等待一个页面完成加载,然后再获取另一个页面。

这是线程版本 (Python 2.7) 的一个非常简化的代码 sn-p:

class FetchThread(threading.Threading)
    def __init__(self, queue, url)
        self.queue = queue
        self.baseurl = url
    ...
    def run(self)
        # Get 10 menu pages in a sequantial order
        for food in range(10):
            url = self.baseurl + '/' + str(food)
            text = urllib2.urlopen(url).read()
            self.queue.put(text)
            ...
def main()
    queue = Queue.Queue()
    urls = ('http://a-restaurant.com/menu', 'http://another-restaurant.com/menu')
    for url in urls:
        fetcher = FetchThread(queue, url)
        fetcher.start()
        ...

以下是我尝试使用 asyncio(在 3.4.1 中)的方法:

@asyncio.coroutine
def fetch(url):
    response = yield from aiohttp.request('GET', url)
    response = yield from response.read_and_close()
    return response.decode('utf-8')

@asyncio.coroutine
def print_page(url):
    page = yield from fetch(url)
    print(page)


l = []
urls = ('http://a-restaurant.com/menu', 'http://another-restaurant.com/menu')
for url in urls:
    for food in range(10):
        menu_url = url + '/' + str(food)
        l.append(print_page(menu_url))

loop.run_until_complete(asyncio.wait(l))

它以非顺序的顺序获取和打印所有内容。好吧,我想这就是这些协程的全部想法。我不应该使用 aiohttp 而只使用 urllib 获取吗?但是第一家餐厅的抓取是否会阻止其他餐厅的抓取?我只是在想这完全错误吗? (这只是一个尝试按顺序获取内容的测试。还没有进入队列部分。)

【问题讨论】:

    标签: python asynchronous python-asyncio


    【解决方案1】:

    对于不关心请求顺序排序的餐厅,您当前的代码可以正常工作。菜单的所有十个请求将同时运行,并在完成后立即打印到标准输出。

    显然,这不适用于需要顺序请求的餐厅。您需要进行一些重构才能使其正常工作:

    @asyncio.coroutine
    def fetch(url):
        response = yield from aiohttp.request('GET', url)
        response = yield from response.read_and_close()
        return response.decode('utf-8')
    
    @asyncio.coroutine
    def print_page(url):
        page = yield from fetch(url)
        print(page)
    
    @syncio.coroutine
    def print_pages_sequential(url, num_pages):
        for food in range(num_pages):
            menu_url = url + '/' + str(food)
            yield from print_page(menu_url)
    
    l = [print_pages_sequential('http://a-restaurant.com/menu', 10)]
    
    conc_url = 'http://another-restaurant.com/menu'
    for food in range(10):
        menu_url = conc_url + '/' + str(food)
        l.append(print_page(menu_url))
    
    loop.run_until_complete(asyncio.wait(l))
    

    我们没有将连续餐厅的所有 10 个请求添加到列表中,而是将一个协程添加到列表中,它将依次遍历所有 10 个页面。其工作方式是yield from print_page 将停止执行print_pages_sequential,直到print_page 请求完成,但它不会阻塞任何其他同时运行的协程(就像您附加的所有print_page 调用一样到l)。

    通过这种方式,您的所有“another-restaurant”请求都可以完全同时运行,就像您想要的那样,并且您的“a-restaurant”请求将按顺序运行,但不会阻塞任何“another-restaurant” "请求。

    编辑:

    如果所有站点都具有相同的顺序抓取要求,则逻辑可以更简化:

    l = []
    urls = ["http://a-restaurant.com/menu", "http://another-restaurant.com/menu"]
    for url in urls:
        menu_url = url + '/' + str(food)
        l.append(print_page_sequential(menu_url, 10))
    
    loop.run_until_complete(asyncio.wait(l))
    

    【讨论】:

    • 谢谢@dano。需要说明的是:所有餐厅都需要在其菜单中顺序获取,但我想同时从第一家餐厅和第二家餐厅获取数据(只是它们各自的菜单获取需要是顺序的)。所以我想解决方案是l = [print_pages_sequential('http://a-restaurant.com/menu', 10), print_pages_sequential('http://another-restaurant.com/menu', 10)],然后运行loop.run_until_complete(asyncio.wait(l))(现在无法测试。)
    • @user3313978 啊,对不起,我误解了这个要求。鉴于约束是正确的,您对解决方案的假设是正确的。我更新了答案以反映新的约束。
    • 这仍然不会按顺序启动请求,@dano。不幸的是,gatherwait 以不确定的顺序安排任何传入的协程,因为它将它们包装在 Tasks 中。见asyncio issue #432。一种解决方法是在将每个协程对象传递给gatherwait 之前手动为其分配一个循环任务。例如l.append(loop.create_task(print_page_sequential(menu_url, 10)))
    【解决方案2】:

    asyncio.Taskasyncio 世界中threading.Thread 的替代品。 asyncio.async 也会创建新任务。

    asyncio.gather是等待多个协程的非常方便的方式,我更喜欢它而不是asyncio.wait

    @asyncio.coroutine
    def fetch(url):
        response = yield from aiohttp.request('GET', url)
        response = yield from response.read_and_close()
        return response.decode('utf-8')
    
    @asyncio.coroutine
    def print_page(url):
        page = yield from fetch(url)
        print(page)
    
    @asyncio.coroutine
    def process_restaurant(url):
        for food in range(10):
            menu_url = url + '/' + str(food)
            yield from print_page(menu_url)
    
    urls = ('http://a-restaurant.com/menu', 'http://another-restaurant.com/menu')
    coros = []
    for url in urls:
        coros.append(asyncio.Task(process_restaurant(url)))
    
    loop.run_until_complete(asyncio.gather(*coros))
    

    【讨论】:

    • 很高兴知道。 asyncio 似乎比我预期的要复杂一些。顺便说一句,def process_restaurant(url) 缺少一个缩进级别。
    • process_restaurant 的标记已修复。感谢您的报告。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-03-03
    • 1970-01-01
    • 2020-02-16
    • 1970-01-01
    • 2020-05-03
    • 1970-01-01
    相关资源
    最近更新 更多