【问题标题】:Streaming sockets from ProcessPoolExecutor来自 ProcessPoolExecutor 的流式套接字
【发布时间】:2014-05-11 15:20:13
【问题描述】:

我正在尝试创建一个 Python 应用程序,其中一个进程(进程“A”)接收请求并将其放入 ProcessPool(来自 concurrent.futures)。在处理此请求时,可能需要将消息传递给第二个进程(进程“B”)。我正在使用 tornado 的 iostream 模块来帮助包装连接并获得响应。

进程 A 未能从 ProcessPool 执行中成功连接到进程 B。我哪里错了?

客户端,它向进程 A 发出初始请求:

#!/usr/bin/env python

import socket
import tornado.iostream
import tornado.ioloop

def print_message ( data ):
    print 'client received', data

s = socket.socket(socket.AF_INET,socket.SOCK_STREAM, 0)
stream = tornado.iostream.IOStream(s)
stream.connect(('localhost',2001))
stream.read_until('\0',print_message)
stream.write('test message\0')
tornado.ioloop.IOLoop().instance().start()

收到初始请求的进程 A:

#!/usr/bin/env python

import tornado.ioloop
import tornado.tcpserver
import tornado.iostream
import socket
import concurrent.futures
import functools

def handle_request ( data ):
    s = socket.socket(socket.AF_INET,socket.SOCK_STREAM,0)
    out_stream = tornado.iostream.IOStream(s)
    out_stream.connect(('localhost',2002))
    future = out_stream.read_until('\0')
    out_stream.write(data+'\0')
    return future.result()

class server_a (tornado.tcpserver.TCPServer):

   def return_response ( self, in_stream, future ):
       in_stream.write(future.result()+'\0')

   def handle_read ( self, in_stream, data ):
       future = self.executor.submit(handle_request,data)
       future.add_done_callback(functools.partial(self.return_response,in_stream))

   def handle_stream ( self, in_stream, address ):
       in_stream.read_until('\0',functools.partial(self.handle_read,in_stream))

   def __init__ ( self ):
       self.executor = concurrent.futures.ProcessPoolExecutor()
       tornado.tcpserver.TCPServer.__init__(self)

server = server_a()
server.bind(2001)
server.start(0)
tornado.ioloop.IOLoop().instance().start()

进程 B,应该接收来自进程 A 的中继请求:

#!/usr/bin/env python

import tornado.ioloop
import tornado.tcpserver
import functools

class server_b (tornado.tcpserver.TCPServer):

    def handle_read ( self, in_stream, data ):
        in_stream.write('server B read'+data+'\0')

    def handle_stream ( self, in_stream, address ):
       in_stream.read_until('\0',functools.partial(self.handle_read,in_stream))

server = server_b()
server.bind(2002)
server.start(0)
tornado.ioloop.IOLoop().instance().start()

最后,进程 A 返回的错误,在 'read_until' 方法期间引发:

ERROR:concurrent.futures:exception calling callback for <Future at 0x10654b890 state=finished raised OSError>
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/site-packages/concurrent/futures/_base.py", line 299, in _invoke_callbacks
    callback(self)
  File "./a.py", line 26, in return_response
    in_stream.write(future.result()+'\0')
  File "/usr/local/lib/python2.7/site-packages/concurrent/futures/_base.py", line 397, in result
    return self.__get_result()
  File "/usr/local/lib/python2.7/site-packages/concurrent/futures/_base.py", line 356, in __get_result
    raise self._exception
OSError: [Errno 9] Bad file descriptor

【问题讨论】:

    标签: python sockets multiprocessing tornado concurrent.futures


    【解决方案1】:

    我不是 100% 确定您为什么会收到这个“错误的文件描述符”错误(不幸的是,concurrent.futures 在向后移植到 2.7 时丢失了回溯信息),但是 ProcessPoolExecutor 的工作进程中没有运行 IOLoop,所以你将无法在这种情况下使用像 IOStream 这样的 Tornado 构造(除非你为每个任务启动一个新的 IOLoop,但这可能没有多大意义,除非你需要与其他异步库兼容)。

    我也不确定以这种方式混合 tornado 的多进程模式和 ProcessPoolExecutor 是否有效。我认为您可能需要将 ProcessPoolExecutor 的初始化移动到 start(0) 调用之后。

    【讨论】:

      【解决方案2】:

      好的,我已经解决了这个问题,通过更新进程 A:

      def stop_loop ( future ):
          tornado.ioloop.IOLoop.current().stop()
      
      def handle_request ( data ):
          tornado.ioloop.IOLoop.clear_current()
          tornado.ioloop.IOLoop.clear_instance()
          s = socket.socket(socket.AF_INET,socket.SOCK_STREAM,0)
          out_stream = tornado.iostream.IOStream(s)
          out_stream.connect(('localhost',2002))
          future = out_stream.read_until('\0')
          future.add_done_callback(stop_loop)
          out_stream.write(data+'\0')
          tornado.ioloop.IOLoop.instance().start()
          return future.result()
      

      尽管 IOLoop 之前没有在生成的进程中启动,但它在调用当前实例时会返回其父循环。清除这些引用允许启动流程的新循环。不过,我不正式知道这里发生了什么。

      【讨论】:

        猜你喜欢
        • 2012-10-27
        • 1970-01-01
        • 1970-01-01
        • 2013-09-11
        • 2012-02-09
        • 1970-01-01
        • 2016-10-19
        • 2016-02-18
        • 2011-05-10
        相关资源
        最近更新 更多