【问题标题】:python multiprocessing: write to same excel filepython多处理:写入相同的excel文件
【发布时间】:2017-08-22 20:39:02
【问题描述】:

我是 Python 新手,我正在尝试将五个不同进程的结果保存到一个 excel 文件中(每个进程写入不同的工作表)。我在这里阅读了不同的帖子,但仍然无法完成,因为我对 pool.map、队列和锁非常困惑,而且我不确定这里需要什么来完成这项任务。 到目前为止,这是我的代码:

list_of_days = ["2017.03.20", "2017.03.21", "2017.03.22", "2017.03.23", "2017.03.24"]
results = pd.DataFrame()

if __name__ == '__main__':
    global list_of_days
    writer = pd.ExcelWriter('myfile.xlsx', engine='xlsxwriter')
    nr_of_cores = multiprocessing.cpu_count()
    l = multiprocessing.Lock()
    pool = multiprocessing.Pool(processes=nr_of_cores, initializer=init, initargs=(l,))
    pool.map(f, range(len(list_of_days)))
    pool.close()
    pool.join()

def init(l):
    global lock
    lock = l

def f(k):
    global results

    *** DO SOME STUFF HERE***

    results = results[ *** finished pandas dataframe *** ]

    lock.acquire()
    results.to_excel(writer, sheet_name=list_of_days[k])
    writer.save()
    lock.release()

结果是在 excel 中只创建了一张工作表(我假设它是最后完成的过程)。关于这段代码的一些问题:

  • 如何避免定义全局变量?
  • 甚至可以传递数据帧吗?
  • 我应该改为将锁定移至 main 吗?

非常感谢这里的一些输入,因为我认为掌握多处理是有用的。谢谢

【问题讨论】:

  • 如果您同时启动多个进程,您将遇到文件锁定问题,即每个进程都试图同时访问同一个文件。它们是不同的工作表并不重要,它仍然是同一个文件。此外,您的代码编写方式,每次都覆盖myfile.xlsx
  • 是的,当然,我需要将锁放在正确的位置,以便只有 1 个进程写入文件。第二点:你是对的,我从 f(k) 中删除了 writer,它现在只在 main 中,这应该可以防止每次都覆盖文件,但是,输出只包含一张表

标签: python queue locking multiprocessing


【解决方案1】:

1) 为什么你在第二个方法的几个地方实现了 time.sleep?

__main__time.sleep(0.1) 中,给已启动的process 一个时间片以启动。
f2(fq, q) 中,给queue 一个时间片以将所有缓冲数据刷新到管道和 使用q.get_nowait()
w(q) 中,仅用于测试模拟writer.to_excel(...) 的长期运行, 我删除了这个。

2) pool.map 和 pool = [mp.Process( . )] 有什么区别?

使用pool.map 不需要Queue,不传递参数,代码更短。 worker_process 必须立即返回 result 并终止。 只要所有iteration 都完成,pool.map 就会开始一个新进程。 之后必须处理results

使用pool = [mp.Process( . )],启动n processesprocess 终止于 queue.Empty

您能想出一种情况,您更喜欢一种方法而不是另一种方法吗?

方法一:快速设置,序列化,只对结果感兴趣继续。
方法 2:如果您想并行完成所有工作负载。


不能在进程中使用 global writer
writer 实例必须属于一个 process

mp.Pool的用法,例如:

def f1(k):
  # *** DO SOME STUFF HERE***
  results = pd.DataFrame(df_)
  return results

if __name__ == '__main__':
    pool = mp.Pool()
    results = pool.map(f1, range(len(list_of_days)))

    writer = pd.ExcelWriter('../test/myfile.xlsx', engine='xlsxwriter')
    for k, result in enumerate(results):
        result.to_excel(writer, sheet_name=list_of_days[k])

    writer.save()
    pool.close()

这导致.to_excel(...)__main__ 进程中被依次调用。


如果你想要并行.to_excel(...),你必须使用mp.Queue()
例如:

worker 进程:

# mp.Queue exeptions have to load from
try:
    # Python3
    import queue
except:
    # Python 2
    import Queue as queue

def f2(fq, q):
    while True:
        try:
            k = fq.get_nowait()
        except queue.Empty:
            exit(0)

        # *** DO SOME STUFF HERE***

        results = pd.DataFrame(df_)
        q.put( (list_of_days[k], results) )
        time.sleep(0.1)  

writer 进程:

def w(q):
    writer = pd.ExcelWriter('myfile.xlsx', engine='xlsxwriter')
    while True:
        try:
            titel, result = q.get()
        except ValueError:
            writer.save()
            exit(0)

        result.to_excel(writer, sheet_name=titel)

__main__ 进程:

if __name__ == '__main__':
    w_q = mp.Queue()
    w_p = mp.Process(target=w, args=(w_q,))
    w_p.start()
    time.sleep(0.1)

    f_q = mp.Queue()
    for i in range(len(list_of_days)):
        f_q.put(i)

    pool = [mp.Process(target=f2, args=(f_q, w_q,)) for p in range(os.cpu_count())]
    for p in pool:
        p.start()
        time.sleep(0.1)

    for p in pool:
        p.join()

    w_q.put('STOP')
    w_p.join()

用 Python:3.4.2 - pandas:0.19.2 - xlsxwriter:0.9.6 测试

【讨论】:

  • 谢谢@stovfl!您的两种方法都可以正常工作。有趣的是,看看如何通过使用队列来避免锁定。问题:1)为什么在第二种方法中的几个地方实现 time.sleep ? 2) pool.map 和 pool = [mp.Process( . )] 有什么区别?您能想到一种情况,您更喜欢一种方法而不是另一种方法吗?
  • @SuperMartingale:更新答案。请反馈:有多少processes,没有multiprocessing,您的节省了多少时间。使用此信息更新您的问题。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2013-03-09
  • 2012-11-06
  • 1970-01-01
  • 1970-01-01
  • 2017-07-20
  • 2022-07-06
相关资源
最近更新 更多