【问题标题】:Throughput differences when using coroutines vs threading使用协程与线程时的吞吐量差异
【发布时间】:2012-03-04 02:16:17
【问题描述】:

几天前我问了一个关于 SO 的问题,关于帮助我设计一个用于构建多个 HTTP 请求的范例

这是场景。我想要一个多生产者、多消费者的系统。我的生产者抓取并抓取了一些网站,并将找到的链接添加到队列中。由于我将爬取多个站点,因此我希望有多个生产者/爬虫。

消费者/工作人员从这个队列中获取信息,向这些链接发出 TCP/UDP 请求,并将结果保存到我的 Django 数据库中。我也希望有多个工作人员,因为每个队列项目是完全独立的。

人们建议为此使用协程库,即 Gevent 或 Eventlet。从未使用过协程,我读到尽管编程范式类似于线程范式,但只有一个线程正在积极执行,但当阻塞调用发生时 - 例如 I/O 调用 - 堆栈在内存中切换,另一个绿色线程接管,直到遇到某种阻塞 I/O 调用。希望我做对了吗?这是我的一篇 SO 帖子中的代码:

import gevent
from gevent.queue import *
import time
import random

q = JoinableQueue()
workers = []
producers = []


def do_work(wid, value):
    gevent.sleep(random.randint(0,2))
    print 'Task', value, 'done', wid


def worker(wid):
    while True:
        item = q.get()
        try:
            print "Got item %s" % item
            do_work(wid, item)
        finally:
            print "No more items"
            q.task_done()


def producer():
    while True:
        item = random.randint(1, 11)
        if item == 10:
            print "Signal Received"
            return
        else:
            print "Added item %s" % item
            q.put(item)


for i in range(4):
    workers.append(gevent.spawn(worker, random.randint(1, 100000)))

# This doesn't work.
for j in range(2):
    producers.append(gevent.spawn(producer))

# Uncommenting this makes this script work.
# producer()

q.join()

这很有效,因为sleep 调用是阻塞调用,当sleep 事件发生时,另一个绿色线程会接管。这比顺序执行快得多。 如您所见,我的程序中没有任何代码故意将一个线程的执行交给另一个线程。我看不出这如何适应上述场景,因为我希望所有线程同时执行。

一切正常,但我觉得我使用 Gevent/Eventlets 实现的吞吐量高于原始的顺序运行程序,但远低于使用真实线程可以实现的吞吐量。

如果我要使用线程机制重新实现我的程序,我的每个生产者和消费者都可以同时工作,而无需像协程那样交换堆栈。

是否应该使用线程重新实现?我的设计错了吗?我没有看到使用协程的真正好处。

也许我的概念有点混乱,但这就是我所吸收的。对我的范式和概念的任何帮助或澄清都会很棒。

谢谢

【问题讨论】:

  • 为什么不使用多个进程?
  • 我不知道多线程与多处理的优缺点,所以不知道好不好。
  • 在 Python 程序中没有“真正的线程”(在任何给定时间只有一个实际的 OS 线程执行)这样的东西,而无需求助于 C 扩展(或重量级 OS 进程),因为全局解释器锁定。
  • 您的生产者没有让步控制权。在生产者完成之前没有并发。

标签: python multithreading coroutine gevent


【解决方案1】:

当你有很多(绿色)线程时,gevent 很棒。我用数千人对其进行了测试,效果非常好。您已确保用于抓取和保存到数据库的所有库都变为绿色。 afaik 如果他们使用 python 的套接字,gevent 注入应该可以工作。但是,用 C 编写的扩展(例如 mysqldb)会阻塞,您需要改用绿色等效项。

如果你使用 gevent,你基本上可以取消队列,为每个任务生成新的(绿色)线程,线程的代码就像 db.save(web.get(address)) 一样简单。当 db 或 web 中的某些库阻塞时,gevent 将负责抢占。只要您的任务适合内存,它就会起作用。

【讨论】:

    【解决方案2】:

    在这种情况下,您的问题不在于程序速度(即选择 gevent 或线程),而在于网络 IO 吞吐量。那是(应该是)决定程序运行速度的瓶颈。

    Gevent 是确保瓶颈的一种好方法,而不是您的程序架构。

    这是您想要的过程:

    import gevent
    from gevent.queue import Queue, JoinableQueue
    from gevent.monkey import patch_all
    
    
    patch_all()  # Patch urllib2, etc
    
    
    def worker(work_queue, output_queue):
        for work_unit in work_queue:
            finished = do_work(work_unit)
            output_queue.put(finished)
            work_queue.task_done()
    
    
    def producer(input_queue, work_queue):
        for url in input_queue:
            url_list = crawl(url)
            for work in url_list:
                work_queue.put(work)
            input_queue.task_done()
    
    
    def do_work(work):
        gevent.sleep(0)  # Actually proces link here
        return work
    
    
    def crawl(url):
        gevent.sleep(0)
        return list(url)  # Actually process url here
    
    input = JoinableQueue()
    work = JoinableQueue()
    output = Queue()
    
    workers = [gevent.spawn(worker, work, output) for i in range(0, 10)]
    producers = [gevent.spawn(producer, input, work) for i in range(0, 10)]
    
    
    list_of_urls = ['foo', 'bar']
    
    for url in list_of_urls:
        input.put(url)
    
    # Wait for input to finish processing
    input.join()
    print 'finished producing'
    # Wait for workers to finish processing work
    work.join()
    print 'finished working'
    
    # We now have output!
    print 'output:'
    for message in output:
        print message
    # Or if you'd like, you could use the output as it comes!
    

    您无需等待输入和工作队列完成,我刚刚在这里演示了这一点。

    【讨论】:

      【解决方案3】:

      如您所见,我的程序中没有任何故意的代码 将一个线程的执行交给另一个线程。我看不到 这如何适合上面的场景,因为我想拥有所有 线程同时执行。

      只有一个 OS 线程,但有几个 greenlet。在您的情况下,gevent.sleep() 允许工作人员同时执行。如果您使用已修补的urllib2gevent 一起工作(通过调用gevent.monkey.patch_*()),阻塞IO 调用(例如urllib2.urlopen(url).read())也会这样做。

      另请参阅A Curious Course on Coroutines and Concurrency 以了解代码如何在单线程环境中同时工作。

      要比较 gevent、线程、多处理之间的吞吐量差异,您可以编写与所有方法兼容的代码:

      #!/usr/bin/env python
      concurrency_impl = 'gevent' # single process, single thread
      ##concurrency_impl = 'threading' # single process, multiple threads
      ##concurrency_impl = 'multiprocessing' # multiple processes
      
      if concurrency_impl == 'gevent':
          import gevent.monkey; gevent.monkey.patch_all()
      
      import logging
      import time
      import random
      from itertools import count, islice
      
      info = logging.info
      
      if concurrency_impl in ['gevent', 'threading']:
          from Queue import Queue as JoinableQueue
          from threading import Thread
      if concurrency_impl == 'multiprocessing':
          from multiprocessing import Process as Thread, JoinableQueue
      

      脚本的其余部分对于所有并发实现都是相同的:

      def do_work(wid, value):
          time.sleep(random.randint(0,2))
          info("%d Task %s done" % (wid, value))
      
      def worker(wid, q):
          while True:
              item = q.get()
              try:
                  info("%d Got item %s" % (wid, item))
                  do_work(wid, item)
              finally:
                  q.task_done()
                  info("%d Done item %s" % (wid, item))
      
      def producer(pid, q):
          for item in iter(lambda: random.randint(1, 11), 10):
              time.sleep(.1) # simulate a green blocking call that yields control
              info("%d Added item %s" % (pid, item))
              q.put(item)
          info("%d Signal Received" % (pid,))
      

      不要在模块级别执行代码把它放在main():

      def main():
          logging.basicConfig(level=logging.INFO,
                              format="%(asctime)s %(process)d %(message)s")
      
          q = JoinableQueue()
          it = count(1)
          producers = [Thread(target=producer, args=(i, q)) for i in islice(it, 2)]
          workers = [Thread(target=worker, args=(i, q)) for i in islice(it, 4)]
          for t in producers+workers:
              t.daemon = True
              t.start()
      
          for t in producers: t.join() # put items in the queue
          q.join() # wait while it is empty
          # exit main thread (daemon workers die at this point)
      
      if __name__=="__main__":    
         main()
      

      【讨论】:

      • 嗨 Sebastian,我查看了我的代码,发现我的生产者和消费者同时工作。当我的一个greenlets中发生阻塞操作时,它会将控制权交给其他greenlets。我添加了缺少的monkey_patch 调用,以便套接字模块也不是阻塞的,但我的处理器无法得到足够的处理。一台普通的 PC 有足够的汁液来拥有更多的同时连接和更多的 greenlets,但我没有得到足够的速度。我非常迷茫和困惑,为什么它不使用更多的处理器并且工作得更快。你能帮我理解吗?我很失落。谢谢。
      • @Mridang Agarwalla:我已经评论了您在问题中发布的代码。 producers不要在里面同时工作。
      • @Mridang Agarwalla:如果您的问题是 IO 绑定(磁盘、网络),那么您的 CPU 有多快并不重要,例如,如果您只能以 50MB/s 的速度写入磁盘,那么它不会没关系,您的 CPU 可以处理 1GB/s。此外,您的程序可能会消耗其他有限资源,例如打开文件的数量。如果您使用 gevent,请确保所有阻塞调用都是“绿色”的,即它们不会阻塞,例如,您的数据库驱动程序可能与 gevent 不兼容。
      • @Mridang Agarwalla: 1. 连接池和绿色的单个 db 连接是单独的 问题,但如果需要,您可以使用单个工具获得这两个问题(我不需要不知道 django+gevent+postgres 首选什么)。 2. 如果一个请求调用了非绿色阻塞调用,那么它会阻塞整个解释器,并且在它返回之前无法处理其他请求(换句话说,如果调用需要相当长的时间,您的应用程序会很慢)。
      • @vumaasha:你的意思是我将如何回答标题中的问题? (我的回答更多是关于问题正文中的具体代码和误解)。如果我想回答这个标题:唯一重要的基准是您在硬件上的代码。多年来,双方都有支持的答案。尽管您需要的并发连接越多,OS 线程就越有可能不是答案。为了获得最大的吞吐量:找到一个瓶颈,消除它,重复。有时获得更好的电缆就足够了,有时您需要重新设计整个项目。这个问题太宽泛了。
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-02-21
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-04-02
      • 2013-03-23
      相关资源
      最近更新 更多