【问题标题】:CherryPy, Multiprocessing and Gevent monkey patchingCherryPy、多处理和 Gevent 猴子补丁
【发布时间】:2012-11-15 17:32:17
【问题描述】:

我正在尝试使用cherrypy + multiprocessing(启动工作“进程”)+ gevent(从工作“进程”内启动并行 i/o greenlets)的组合。似乎最简单的方法是monkeypatch multiprocessing,因为greenlets只能在主应用程序进程中运行。

但是,猴子修补似乎适用于多处理的某些部分,而不适用于其他部分。这是我的示例 CherryPy 服务器:

from gevent import monkey
monkey.patch_all()

import gevent
import cherrypy
import multiprocessing

def launch_testfuncs():
    jobs = [gevent.spawn(testfunc)
            for i in range(0, 12)]

    gevent.joinall(jobs, timeout=10)

def testfunc():
    print 'testing'

class HelloWorld(object):
    def index(self):
        launch_testfuncs()

        return "Hello World!"
    index.exposed = True

    def index_proc(self):
        proc = multiprocessing.Process(target=launch_testfuncs)
        proc.start()
        proc.join()

        return "Hello World 2!"
    index_proc.exposed = True

    def index_pool(self):
        pool = multiprocessing.Pool(1)
        return "Hello World 3!"
    index_pool.exposed = True

    def index_namespace(self):
        manager = multiprocessing.Manager()
        anamespace = manager.Namespace()
        anamespace.val = 23
        return "Hello World 4!"
    index_namespace.exposed = True


cherrypy.quickstart(HelloWorld())

猴子补丁后的以下工作:

  • index - 直接从cherrypy类中生成greenlets
  • index_proc - 使用 multiprocessing.Process 启动一个新进程,然后从该进程中生成 greenlets

以下有问题:

  • index_pool - 启动 multiprocessing.Pool - 挂起并且永不返回
  • index_namespace - 初始化 multiprocessing.Manager 命名空间以管理工作人员池/集合中的共享内存 - 返回以下错误消息:

    [15/Nov/2012:17:19:31] HTTP Traceback (most recent call last):
      File "/Library/Python/2.7/site-packages/cherrypy/_cprequest.py", line 656, in respond
    response.body = self.handler()
      File "/Library/Python/2.7/site-packages/cherrypy/lib/encoding.py", line 188, in __call__
    self.body = self.oldhandler(*args, **kwargs)
      File "/Library/Python/2.7/site-packages/cherrypy/_cpdispatch.py", line 34, in __call__
    return self.callable(*self.args, **self.kwargs)
      File "server.py", line 39, in index_namespace
    anamespace = manager.Namespace()
      File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/managers.py", line 667, in temp
    token, exp = self._create(typeid, *args, **kwds)
      File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/managers.py", line 565, in _create
    conn = self._Client(self._address, authkey=self._authkey)
      File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/connection.py", line 175, in Client
    answer_challenge(c, authkey)
      File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/connection.py", line 414, in answer_challenge
    response = connection.recv_bytes(256)        # reject large message
    IOError: [Errno 35] Resource temporarily unavailable
    

我尝试在 gevent 文档中找到一些与此相关的文档,但找不到与此相关的任何内容。只是gevent的猴子补丁不完整吗?有没有其他人遇到过类似的问题,有没有办法解决?

【问题讨论】:

  • 我还删除了 CherryPy 包装器,并尝试直接调用 index_pool() 和 index_namespace() 得到相同的结果——即解释器为前者挂起,并返回“资源暂时不可用”错误后一种情况下的消息。
  • 以下似乎也讨论了 multiprocessing.Manager() 的损坏,遗憾的是没有解决方案:stackoverflow.com/questions/8678307/…

标签: python multiprocessing cherrypy gevent monkeypatching


【解决方案1】:

问题似乎是由于gevent.socket 是非阻塞的,这意味着如果X 字节不能立即在套接字上可用,任何socket.recv_bytes(X) 调用都会引发该错误。具体来说,gevent.socket 旨在永远不会阻塞套接字。

出现multiprocessing 的问题是因为它使用了stdlib socket 模块并预计它会阻塞,而在您使用monkey.patch_all()'d 之后,socket 模块已被替换,并且multiprocessing.connection不是为处理新的异步行为而设计的。

您可以告诉monkey 不要修补socket,但这意味着在您的应用程序中利用异步套接字的任何东西都可能因此而导致一些性能损失。

要执行此操作,请调用 patch_allsocket=Falsepatch_all(socket=False)

这不是一个理想的解决方案,因为您几乎失去了最初使用 gevent 所获得的大部分好处。

【讨论】:

    【解决方案2】:

    我遇到了和你一样的问题。我的解决方案是我刚刚使用 multiprocessing.Process() 来生成固定数量的进程。最后全部加入,等待完成。

    #!/usr/bin/env python
    # encoding: utf-8
    
    from gevent import monkey
    monkey.patch_all()
    
    import gevent
    import multiprocessing as mp
    
    
    NUM = 10
    
    
    def work(i):
        jobs = [gevent.spawn(func, i)
                for i in range(0, 12)]
        gevent.joinall(jobs)
        print "{} Done {}".format(mp.current_process().name, i)
    
    
    def func(x):
        print "Gevent: {}".format(x)
    
    def main():
    
        processes = [mp.Process(name="Process-{}".format(i), target=work, args=(i,)) for i in xrange(NUM)]
    
        for process in processes:
            process.start()
    
        for process in processes:
            process.join()
    
    
    if __name__ == '__main__':
        main()
    

    输出

    Gevent: 0
    Gevent: 1
    Gevent: 2
    Gevent: 3
    Gevent: 4
    Gevent: 5
    Gevent: 6
    Gevent: 7
    Gevent: 8
    Gevent: 9
    Gevent: 10
    Gevent: 11
    Process-0 Done 11
    Gevent: 0
    Gevent: 1
    Gevent: 2
    Gevent: 3
    Gevent: 4
    Gevent: 5
    Gevent: 6
    Gevent: 7
    Gevent: 8
    Gevent: 9
    Gevent: 10
    Gevent: 11
    Process-1 Done 11
    Gevent: 0
    Gevent: 1
    Gevent: 2
    Gevent: 3
    Gevent: 4
    Gevent: 5
    Gevent: 6
    Gevent: 7
    Gevent: 8
    Gevent: 9
    Gevent: 10
    Gevent: 11
    Process-2 Done 11
    Gevent: 0
    ... ...
    

    所以这是 gevent 与多处理一起工作的解决方案。

    【讨论】:

      猜你喜欢
      • 2012-01-30
      • 2021-04-04
      • 2012-07-14
      • 2017-01-25
      • 1970-01-01
      • 2012-06-14
      • 1970-01-01
      • 2016-09-01
      • 2012-09-16
      相关资源
      最近更新 更多