【问题标题】:Running multiple asynchronous function and get the returned value of each function运行多个异步函数并获取每个函数的返回值
【发布时间】:2016-11-10 20:17:27
【问题描述】:

我试图创建一个可以异步运行多个进程并发送响应的函数。由于multiprocessing.Process()不返回响应,我想创建一个函数为:

from multiprocessing import Process

def async_call(func_list):
    """
    Runs the list of function asynchronously.

    :param func_list: Expects list of lists to be of format
        [[func1, args1, kwargs1], [func2, args2, kwargs2], ...]
    :return: List of output of the functions
        [output1, output2, ...]
    """
    response_list = []
    def worker(function, f_args, f_kwargs, response_list):
        """
        Runs the function and appends the output to list
        """
        response = function(*f_args, **f_kwargs)
        response_list.append(response)

    processes = [Process(target=worker, args=(func, args, kwargs, response_list)) \
                    for func, args, kwargs in func_list]

    for process in processes:
        process.start()
    for process in processes:
        process.join()
    return response_list

在这个函数中,我异步调用worker,它接受附加参数为list。由于列表作为参考传递,我想我可以在列表中附加实际函数的响应。 async_call 会返回所有函数的响应。

但这并不符合我的预期。值被附加到worker() 内的list,但在工作人员之外response_list 列表仍然为空。

知道我做错了什么吗?而且,有没有其他方法可以实现我正在做的事情?

【问题讨论】:

    标签: python asynchronous multiprocessing


    【解决方案1】:

    您不能直接跨进程共享对象。您需要使用专门为传递值而设计的类之一,即 Queue 和 Pipe;见the documentation

    【讨论】:

      【解决方案2】:

      正如Daniel's Answer 中提到的,对象不能在进程之间直接共享。但是,multiprocessing 库提供了 QueuesPipes 作为进程之间的通信通道。 (阅读documentation了解更多详情)

      这是我使用multiprocessing.Queue() 创建的函数:

      def async_call(func_list):
          """
          Runs the list of function asynchronously.
      
          :param func_list: Expects list of lists to be of format
              [[func1, args1, kwargs1], [func2, args2, kwargs2], ...]
          :return: List of output of the functions
              [output1, output2, ...]
          """
          def worker(function, f_args, f_kwargs, queue, index):
              """
              Runs the function and appends the output to list, and the Exception in the case of error
              """
              response = {
                  'index': index,  # For tracking the index of each function in actual list.
                                   # Since, this function is called asynchronously, order in
                                   # queue may differ
                  'data': None,
                  'error': None
              }
      
              # Handle error in the function call
              try:
                  response['data'] = function(*f_args, **f_kwargs)
              except Exception as e:
                  response['error'] = e  # send back the exception along with the queue
      
              queue.put(response)
          queue = Queue()
          processes = [Process(target=worker, args=(func, args, kwargs, queue, i)) \
                          for i, (func, args, kwargs) in enumerate(func_list)]
          
          for process in processes:
              process.start()
      
          response_list = []
          for process in processes:
              # Wait for process to finish
              process.join()
      
              # Get back the response from the queue
              response = queue.get()
              if response['error']:
                  raise response['error']   # Raise exception if the function call failed
              response_list.append(response)
      
          return [content['data'] for content in sorted(response_list, key=lambda x: x['index'])]
      

      示例运行:

      def my_sum(x, y):
          return x + y
      
      def your_mul(x, y):
          return x*y
      
      my_func_list = [[my_sum, [1], {'y': 2}], [your_mul, [], {'x':1, 'y':2}]]
      
      async_call(my_func_list)
      # Value returned: [3, 2]
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2019-11-11
        • 2019-03-04
        • 1970-01-01
        • 2018-04-09
        • 2023-03-11
        • 1970-01-01
        • 2021-01-09
        相关资源
        最近更新 更多