【问题标题】:Adding additional random parameter as an argument in pool.map function in python 3.4.7在 python 3.4.7 的 pool.map 函数中添加额外的随机参数作为参数
【发布时间】:2023-04-06 07:02:01
【问题描述】:

我想在大型数据集上使用多重处理来查找两列的乘积,并使用参数中的给定参数过滤数据集。我构建了一个测试集,但我无法让多处理在这个集上工作。

首先,我试图在parallelize_dataframe 函数中划分数据集,然后在subset_col 函数中应用乘法函数和过滤函数。稍后我将完整的数据集附加回 parallelize_dataframe。

import numpy as np
import pandas as pd
from multiprocessing import Pool
from multiprocessing import Lock

df = pd.DataFrame({'col1': [1, 0, 1, 1, 1, 0, 0, 1, 0, 1],
                'col2': ['aa', 'aa', 'aa', 'bb', 'bb', 'bb', 'bb', 'cc', 'cc', 'cc'],
                'col3': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
                'col4': [21, 22, 23, 24, 25, 26, 27, 28, 29, 30]})



def subset_col(df, p):
    print("Working with number: " + str(p))
    df[col5] = df[col3]*df[col4]
    df= df[df['col1'] == p]


def parallelize_dataframe(df, p, func, n_cores=80):
    df_split = np.array_split(df, n_cores)
    pool = Pool(n_cores)
    df = pd.concat(pool.map(func, df_split, p))
    pool.close()
    pool.join()
    return df


df3 = parallelize_dataframe(df,1,subset_col)


结果应该是 col3 和 col4 的乘积,其中 col1 用一个值过滤。但我总是收到一条错误消息:

File "<stdin>", line 1, in <module>
File "<stdin>", line 4, in parallelize_dataframe
struct.error: 'i' format requires -2147483648 <= number <= 2147483647 

但是,如果我从所有功能中删除过滤器“p”,它就可以正常工作。有人可以帮我调试一下吗?

【问题讨论】:

    标签: python pandas numpy python-multiprocessing multiprocess


    【解决方案1】:

    来自multiprocessing.Pool.map 的官方文档,它“支持只支持一个iterable 参数”。因此,您需要更改subset_col 的接口以采用单个参数。此外,您忘记创建列字符串,导致名称错误。为了减少计算量,您应该在乘法之前进行过滤。然后应该返回一个值,除非您的函数仅通过副作用操作(我假设您不希望这样做,因为您连接了池结果)。

    def subset_col(pair):
        df, p = pair
        print("Working with number: " + str(p))
        df = df[df['col1'] == p].copy()
        df['col5'] = df['col3']
        return df
    

    接下来,我们需要修正您调用pool.map 的方式,因为根据您的操作,它应该只需要两个参数(第三个,最后一个参数是块大小)。由于您希望每个进程使用相同的p,因此我们将dfs 压缩在一起,每个进程的重复值p。另外,考虑使用上下文管理器来处理关闭资源。

    def parallelize_dataframe(df, p, func, n_cores=None):
        if n_cores is None:
            n_cores = os.cpu_count()
    
        dfs = np.array_split(df, n_cores)
        pairs = zip(dfs, itertools.repeat(p))
        with Pool(n_cores) as pool:
            result = pool.map(func, pairs)
    
        df = pd.concat(result)
        return df
    

    这现在可以正确返回新的数据帧。但我绝对怀疑你有一台 80 核的机器。考虑实现n_cores=NonePython dynamically figure out 使用os.cpu_count 让你的机器上有多少个内核

    df3 = parallelize_dataframe(df, 1, subset_col)
    

    根据您对Pool.starmap 变体的要求:

    def subset_col(df, p):
        # remove unpacking line
        ...
    
    def parallelize_dataframe(df, p, func, n_cores=None):
        ...
        # change `pool.map(...)` to `pool.starmap(...)`
        ...
    

    但是您应该注意,Pool 不提供 imapimap_unordered 替代 starmap,这两个都是惰性评估版本,不同的是是否保留顺序。

    【讨论】:

    • 谢谢,马特。我正在使用 ec2 实例 m5.24xlage。所以,我有大约 96 个内核可供使用。我面临的问题是数据帧太大了,我必须找到一种多进程的方法来尽可能减少实现时间。星图不应该允许多个可迭代参数。我经常使用 R,并且熟悉 foreach 和 doParallel。尝试在 python 中发现类似的功能。
    • 我已经为starmap varint 更新了上面的帖子。但是,对于如此庞大的数据集,您可能需要考虑使用 Pandas 以外的其他工具,例如 Dask。它被设计为具有与 Pandas 类似的 API,并且专门用于大型计算。
    猜你喜欢
    • 2021-10-17
    • 1970-01-01
    • 1970-01-01
    • 2021-11-30
    • 1970-01-01
    • 1970-01-01
    • 2022-06-30
    • 2012-11-16
    • 2019-03-17
    相关资源
    最近更新 更多