【问题标题】:Send panda DataFrame's between processes在进程之间发送 pandas DataFrame
【发布时间】:2021-10-15 01:28:17
【问题描述】:

我不想分享,只是发送一个DataFrame从一个进程到另一个进程。

主要的DataFrame被切割成小块,每一块都由一个单独的进程(在pythonsmultiprocessing的含义中)在它自己的CPU核心上处理。在“子”进程完成后,它们应该发回结果数据帧以再次将它们连接在一起。

但在我的示例中,我到达了脚本的 END,但 DataFrame 永远不会通过 multiprocessing.Queue 对象发回。

#!/usr/bin/env python3
import multiprocessing
import pandas as pd

def worker(df, queue):
    print(multiprocessing.current_process())
    # create new column
    df['X'] = df.b + '-' + df.c
    # modify existing column
    df.d = df.d.apply(lambda x: x.upper())
    # send it back to main process
    queue.put(df)  # tried .copy() also!


if __name__ == '__main__':
    print(pd.__version__)

    # initial data
    df = pd.DataFrame({
        'a': ['A', 'A', 'B', 'B'],
        'b': list('XXXX'),
        'c': list('6218'),
        'd': ['zwei', 'zwei', 'vier', 'neuen']
    })
    # slice the data frame
    df_parts = [
        df.iloc[:2].copy(),
        df.iloc[2:].copy()
    ]

    processes = []

    queue = multiprocessing.Queue()

    for i in range(len(df_parts)):
        p = multiprocessing.Process(target=worker,
                                    args=(df_parts[i], queue) )
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

    while queue.full():
        print(queue.get())

    print('END')

我知道在multiprocessing.Process 实例之间发送数据是在后台腌制的。但我没有收到任何错误。

输出:

1.2.5
<Process name='Process-2' parent=14112 started>
<Process name='Process-1' parent=14112 started>
END
>>>

【问题讨论】:

    标签: python pandas multiprocessing


    【解决方案1】:

    你有几个问题:

    1. 根据Queue.full() 的文档:

    如果队列已满,则返回 True,否则返回 False。由于多线程/多处理语义,这是不可靠的。

    所以你不应该使用这种方法。

    1. 必须永远不要尝试针对multiprocessig.Queue 实例发出get,该实例是在您加入该子进程后由子进程写入的。来自文档:

    这意味着如果您尝试加入该进程,您可能会遇到死锁,除非您确定已放入队列的所有项目都已被消耗。同样,如果子进程是非守护进程,则父进程在尝试加入其所有非守护子进程时可能会在退出时挂起。

    1. 您无法确定您创建的两个进程将其输出写入您创建的单个输出队列的顺序。如果您想确保以正确的顺序获得输出,请为每个进程创建一个单独的输出队列实例。这也简化了您的get 处理。如果您确实想使用一个输出队列,您知道每个进程正在写入一条消息并且您有 N 个进程,因此只需发出 N 个 get 调用,您就完成了队列的处理。
    #!/usr/bin/env python3
    import multiprocessing
    import pandas as pd
    
    def worker(df, queue):
        print(multiprocessing.current_process())
        # create new column
        df['X'] = df.b + '-' + df.c
        # modify existing column
        df.d = df.d.apply(lambda x: x.upper())
        # send it back to main process
        queue.put(df)  # tried .copy() also!
    
    
    if __name__ == '__main__':
        print(pd.__version__)
    
        # initial data
        df = pd.DataFrame({
            'a': ['A', 'A', 'B', 'B'],
            'b': list('XXXX'),
            'c': list('6218'),
            'd': ['zwei', 'zwei', 'vier', 'neuen']
        })
        # slice the data frame
        df_parts = [
            df.iloc[:2].copy(),
            df.iloc[2:].copy()
        ]
    
        processes = []
        queues = []
    
        for i in range(len(df_parts)):
            queue = multiprocessing.Queue()
            queues.append(queue)
            p = multiprocessing.Process(target=worker,
                                        args=(df_parts[i], queue) )
            processes.append(p)
            p.start()
    
        for queue in queues:
            print(queue.get())
    
        for p in processes:
            p.join()
    
    
        print('END')
    

    打印:

    1.3.0
    <Process name='Process-1' parent=7748 started>
    <Process name='Process-2' parent=7748 started>
       a  b  c     d    X
    0  A  X  6  ZWEI  X-6
    1  A  X  2  ZWEI  X-2
       a  b  c      d    X
    2  B  X  1   VIER  X-1
    3  B  X  8  NEUEN  X-8
    END
    

    使用一个输出队列:

        processes = []
        queue = multiprocessing.Queue()
    
        for i in range(len(df_parts)):
            p = multiprocessing.Process(target=worker,
                                        args=(df_parts[i], queue) )
            processes.append(p)
            p.start()
    
        for _ in range(len(processes)):
            print(queue.get())
    
        for p in processes:
            p.join()
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2018-04-08
      • 2017-06-09
      • 2020-01-08
      • 2011-01-27
      • 1970-01-01
      • 1970-01-01
      • 2023-03-25
      • 1970-01-01
      相关资源
      最近更新 更多