【问题标题】:Parallelizing a for loop in python在python中并行化for循环
【发布时间】:2025-12-29 08:40:11
【问题描述】:

我有一个字典,其中每个键(日期)都包含一个表(格式为[day1, val11, val21], [day2, va12, val22], [day3, val13, val23], ... 的多个列表。我想将其转换为 DataFrame;这是通过以下代码完成的:

df4 = pd.DataFrame(columns=sorted(set_days))

for date in dic.keys():
        days = [day  for day, val1, val2  in dic[date]]
        val1 = [val1 for day, val1, val2  in dic[date]]
        df4.loc[date, days] = val1

这段代码运行良好,但运行需要两个多小时。 经过一番研究,我意识到我可以通过 multiprocessing 库将其并行化;以下代码是预期的并行版本

import multiprocessing

def func(date):
    global df4, dic
    days = [day  for day, val1, val2  in dic[date]]
    val1 = [val1 for day, val1, val2  in dic[date]]
    df4.loc[date, days] = val1

multiprocessing.Pool(processes=8).map(func, dic.keys())

这段代码的问题是,在执行multiprocessing.Pool(processes...之后,df4的DataFrame是空的。

任何帮助将不胜感激。

示例

假设字典包含两天:

dic['20030812'][:4]
Out: [[1, 24.25, 0.0], [20, 23.54, 23.54], [30, 23.13, 24.36], [50, 22.85, 23.57]]

dic['20030813'][:4]
Out: [[1, 24.23, 0.0], [19, 23.4, 22.82], [30, 22.97, 24.19], [49, 22.74, 23.25]]

那么DataFrame应该是这样的:

df4.loc[:, 1:50]
             1    2    3    4    5   ...     46   47   48     49     50
20030812  24.25  NaN  NaN  NaN  NaN  ...    NaN  NaN  NaN    NaN  22.85
20030813  24.23  NaN  NaN  NaN  NaN  ...    NaN  NaN  NaN  22.74    NaN

还有,

dic.keys()
Out[36]: dict_keys(['20030812', '20030813'])

df1.head().to_dict()
Out: 
{1: {'20030812': 24.25, '20030813': 24.23},
 2: {'20030812': nan, '20030813': nan},
 3: {'20030812': nan, '20030813': nan},
 4: {'20030812': nan, '20030813': nan},
 5: {'20030812': nan, '20030813': nan},
 6: {'20030812': nan, '20030813': nan},
 7: {'20030812': nan, '20030813': nan},
 8: {'20030812': nan, '20030813': nan},
 9: {'20030812': nan, '20030813': nan},
 10: {'20030812': nan, '20030813': nan},
 11: {'20030812': nan, '20030813': nan},
 12: {'20030812': nan, '20030813': nan},
 13: {'20030812': nan, '20030813': nan},
 14: {'20030812': nan, '20030813': nan},
 15: {'20030812': nan, '20030813': nan},
 16: {'20030812': nan, '20030813': nan},
 17: {'20030812': nan, '20030813': nan},
 18: {'20030812': nan, '20030813': nan},
 19: {'20030812': nan, '20030813': 23.4},
 20: {'20030812': 23.54, '20030813': nan},
 21: {'20030812': nan, '20030813': nan},
 22: {'20030812': nan, '20030813': nan},
 23: {'20030812': nan, '20030813': nan},
 24: {'20030812': nan, '20030813': nan},
 25: {'20030812': nan, '20030813': nan},
 26: {'20030812': nan, '20030813': nan},
 27: {'20030812': nan, '20030813': nan},
 28: {'20030812': nan, '20030813': nan},
 29: {'20030812': nan, '20030813': nan},
 30: {'20030812': 23.13, '20030813': 22.97},
 31: {'20030812': nan, '20030813': nan},
 32: {'20030812': nan, '20030813': nan},
 ...

【问题讨论】:

  • 我认为这可能是一个 XY 问题。也许您根本不需要循环,也不需要多处理池。你的df.head().to_dict() 是什么,你的dic.keys 是什么? (发布示例)
  • 你是不是故意忽略val2?此外,是否正在改变您获得dict 选项的方式?

标签: python python-3.x multiprocessing


【解决方案1】:

要回答您的原始问题(大致:“为什么df4 DataFrame 是空的?”),这不起作用的原因是当Pool 工作人员启动时,每个工作人员都会继承一个个人副本- 写入父数据的视图(如果multiprocessing 运行在具有fork 的类UNIX 系统上,则直接写入视图,或者在Windows 上运行时通过kludgy 方法模拟它)。

因此,当每个工人这样做时:

 df4.loc[date, days] = val1

它正在改变工人的df4 的个人副本;父进程的副本保持不变。

一般来说,有以下三种处理方式:

  1. 将您的工作函数更改为 return 可以在父进程中使用的东西。例如,不要尝试使用df4.loc[date, days] = val1 执行就地突变,而是返回在父级中执行该操作所需的内容,例如return date, days, val1,然后将父级改为:

    for date, days, val in multiprocessing.Pool(processes=8).map(func, dic.keys()):
        df4.loc[date, days] = val
    

    这种方法的缺点是它需要对每个返回值进行腌制(Python 的序列化版本)、从子级到父级的管道以及取消腌制;如果工作任务没有做很多工作,特别是如果返回值很大(在这种情况下,似乎就是这种情况),它很容易花更多的时间在序列化和 IPC 上而不是在并行度上获得的时间。

  2. 使用共享对象/内存(在 this answer to "Multiprocessing writing to pandas dataframe" 中演示)。在实践中,这通常不会给你带来太多好处,因为不基于the more "raw" ctypes sharing using multiprocessing.sharedctypes 的东西最终仍然需要将数据从一个进程传输到另一个进程;不过,基于sharedctypes 的东西可以获得有意义的速度提升,因为一旦映射,共享原始 C 数组的访问速度几乎与本地内存一样快。

  3. 如果被并行化的工作是受 I/O 限制的,或者使用第三方 C 扩展来进行受 CPU 限制的工作(例如 numpy),尽管 GIL 干扰和线程,您可能能够从线程获得所需的速度提升 共享相同的内存。您的案例似乎不是 I/O 绑定或有意义地依赖于可能释放 GIL 的第三方 C 扩展,所以它在这里可能无济于事,但总的来说,从基于进程的并行性切换的简单方法到基于线程的并行性(当您已经在使用 multiprocessing 时)是将 import 从:

    import multiprocessing
    

    import multiprocessing.dummy as multiprocessing
    

    它以预期的名称导入the thread-backed version of multiprocessing,因此代码可以无缝地从使用进程切换到线程。

【讨论】:

  • 显然,如果您真的不需要并行性,正如您的回答所示,正确的解决方案是修复导致减速的算法选择并避免无意义的并行性,我只是想解释一下原因如果您将来需要并行化,您的问题可能会有所帮助。
【解决方案2】:

正如 RafaelC 所暗示的,这是一个 XY 问题。 在没有多重处理的情况下,我已经能够将执行时间减少到 20 秒。

我创建了一个替换字典的 lista 列表,而不是在 df4 DataFrame 中为每个日期添加一行,一旦 lista 已满,我将 lista 转换为 DataFrame。

# Returns the largest day from  all the dates (each date has a different number of days)
def longest_series(dic):
    largest_series = 0
    for date in dic.keys():
        # get the last day's table of a specific date
        current_series = dic[date][-1][0]
        if largest_series < current_series:
            largest_series = current_series
    return largest_series


ls = longest_series(dic)
l_total_days = list(range(1, ls+1))
s_total_days = set(l_total_days)

# creating lista list, lista is similar to dic 
#The difference is that, in lista, every date has the same number of days 
#i.e. from 1 to ls, and it does not contain the dates.

# It takes 15 seconds
lista = list()
for date in dic.keys():
    present_days = list()
    presen_values = list()
    for day, val_252, _ in dic[date]:
        present_days.append(day)
        presen_values.append(val_252)

    missing_days = list(s_total_days.difference(set(present_days))) # extra days added to date
    missing_values = [None] * len(missing_days)                     # extra values added to date
    all_days_index = list(np.argsort(present_days + missing_days))  # we need to preserve the order between days and values
    all_day_values = presen_values + missing_values  
    lista.append(list(np.array(all_day_values)[all_days_index]))


# It takes 4 seconds
df = pd.DataFrame(lista, index= dic.keys(), columns=l_total_days)

【讨论】: