【问题标题】:Python socketio with multiprocessing具有多处理功能的 Python socketio
【发布时间】:2021-03-28 04:59:48
【问题描述】:

所以我一直在努力解决这个让我发疯的泡菜错误。我有以下带有以下代码的主引擎类:

import eventlet
import socketio
import multiprocessing
from multiprocessing import Queue
from multi import SIOSerever

class masterEngine:

    if __name__ == '__main__': 
            
        serverObj = SIOSerever()

        try:
            receiveData = multiprocessing.Process(target=serverObj.run)
            receiveData.start()

            receiveProcess = multiprocessing.Process(target=serverObj.fetchFromQueue)
            receiveProcess.start()

            receiveData.join()
            receiveProcess.join()
            
        except Exception as error:
            print(error)

我还有另一个名为 multi 的文件,其运行如下:

import multiprocessing
from multiprocessing import Queue
import eventlet
import socketio

class SIOSerever:

  def __init__(self):
    self.cycletimeQueue = Queue()
    self.sio = socketio.Server(cors_allowed_origins='*',logger=False)
    self.app = socketio.WSGIApp(self.sio, static_files={'/': 'index.html',})
    self.ws_server = eventlet.listen(('0.0.0.0', 5000))

    @self.sio.on('production')
    def p_message(sid, message):
      self.cycletimeQueue.put(message)
      print("I logged : "+str(message))

  def run(self):
    eventlet.wsgi.server(self.ws_server, self.app)

  def fetchFromQueue(self):
    while True:
      cycle = self.cycletimeQueue.get()
      print(cycle)

如您所见,我可以尝试创建两个我想独立运行的 def run 和 fetchFromQueue 进程。

我的 run 函数启动 python-socket 服务器,我从 html 网页发送一些数据到该服务器(无需多处理即可完美运行)。然后我尝试将接收到的数据推送到队列,以便我的其他函数可以检索它并使用接收到的数据。

我需要对从套接字接收到的数据执行一组耗时操作,这就是我将其全部推入队列的原因。

在运行主引擎类时,我收到以下信息:

Can't pickle <class 'threading.Thread'>: it's not the same object as threading.Thread
I ended!
[Finished in 0.5s]

你能帮我解决我做错了什么吗?

【问题讨论】:

    标签: python multiprocessing pickle python-multiprocessing python-socketio


    【解决方案1】:

    来自多处理programming guidelines

    将资源显式传递给子进程

    在使用 fork start 方法的 Unix 上,子进程可以使用在使用全局资源的父进程中创建的共享资源。但是,最好将对象作为参数传递给子进程的构造函数。

    除了使代码(可能)与 Windows 和其他启动方法兼容之外,这还确保只要子进程仍然存在,该对象就不会在父进程中被垃圾收集。如果在父进程中对对象进行垃圾回收时释放了某些资源,这可能很重要。

    因此,我稍微修改了您的示例,删除了所有不必要的内容,但展示了一种将共享队列显式传递给所有使用它的进程的方法:

    import multiprocessing
    
    MAX = 5
    
    class SIOSerever:
    
      def __init__(self, queue):
        self.cycletimeQueue = queue
    
      def run(self):
        for i in range(MAX):
          self.cycletimeQueue.put(i)
    
      @staticmethod
      def fetchFromQueue(cycletimeQueue):
        while True:
          cycle = cycletimeQueue.get()
          print(cycle)
          if cycle >= MAX - 1:
            break
    
    
    def start_server(queue):
        server = SIOSerever(queue)
        server.run()
    
    
    if __name__ == '__main__':
    
        try:
            queue = multiprocessing.Queue()
            receiveData = multiprocessing.Process(target=start_server, args=(queue,))
            receiveData.start()
    
            receiveProcess = multiprocessing.Process(target=SIOSerever.fetchFromQueue, args=(queue,))
            receiveProcess.start()
    
            receiveData.join()
            receiveProcess.join()
    
        except Exception as error:
            print(error)
    
    0
    1
    ...
    

    【讨论】:

    • 那么在上面的例子中,Queue的最大尺寸定义为5对吧?如果我想要不定尺寸怎么办?另外,我想将从套接字接收到的数据放入队列中。您所做的只是添加数字。
    • 队列是无限的,我使用 5 个项目来进行演示,以使程序终止。您可以将 int 放入队列 str 或 bytes 而不是。
    • 你能帮我看看如何将我从套接字服务器接收到的数据推送到队列中吗?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2019-05-09
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多