【问题标题】:Multiprocessing.pool with a function that has multiple args and kwargs具有多个 args 和 kwargs 的函数的 Multiprocessing.pool
【发布时间】:2020-01-15 17:32:48
【问题描述】:

我想使用 mutliprocessing.pool 方法并行计算。问题是我想在计算中使用的函数提供了两个 args 和可选的 kwargs,第一个参数是数据帧,第二个参数是 str,任何 kwargs 是字典。

对于我尝试执行的所有计算,我想要使用的数据框和字典都是相同的,只有第二个 arg 是不断变化的。因此,我希望能够使用 map 方法将它作为不同字符串的列表传递给带有 df 和 dict 的已打包函数。

from utils import *
import multiprocessing
from functools import partial



def sumifs(df, result_col, **kwargs):

    compare_cols = list(kwargs.keys())
    operators = {}
    for col in compare_cols:
        if type(kwargs[col]) == tuple:
            operators[col] = kwargs[col][0]
            kwargs[col] = list(kwargs[col][1])
        else:
            operators[col] = operator.eq
            kwargs[col] = list(kwargs[col])
    result = []
    cache = {}
    # Go through each value
    for i in range(len(kwargs[compare_cols[0]])):
        compare_values = [kwargs[col][i] for col in compare_cols]
        cache_key = ','.join([str(s) for s in compare_values])
        if (cache_key in cache):
            entry = cache[cache_key]
        else:
            df_copy = df.copy()
            for compare_col, compare_value in zip(compare_cols, compare_values):
                df_copy = df_copy.loc[operators[compare_col](df_copy[compare_col], compare_value)]
            entry = df_copy[result_col].sum()
            cache[cache_key] = entry
        result.append(entry)
    return pd.Series(result)

if __name__ == '__main__':

    ca = read_in_table('Tab1')
    total_consumer_ids = len(ca)

    base = pd.DataFrame()
    base['ID'] = range(1, total_consumer_ids + 1)


    result_col= ['A', 'B', 'C']
    keywords = {'Z': base['Consumer archetype ID']}

    max_number_processes = multiprocessing.cpu_count()
    with multiprocessing.Pool(processes=max_number_processes) as pool:
        results = pool.map(partial(sumifs, a=ca, kwargs=keywords), result_col)
    print(results)

但是,当我运行上面的代码时,我收到以下错误:TypeError: sumifs() missing 1 required positional argument: 'result_col'。如何为函数提供第一个 arg 和 kwargs,同时提供第二个参数作为 str 列表,以便我可以并行化计算?我在论坛中阅读了几个类似的问题,但似乎没有一个解决方案适用于这种情况......

谢谢你,如果有什么不清楚的地方,我很抱歉,我今天才知道多处理包!

【问题讨论】:

  • 尝试发送带有 ** 前缀的 keywords 参数,不带 kwargs 键。此外,请查看以下链接以获取有关调用 Pool.map 函数的更多信息。 stackoverflow.com/questions/59611745/…
  • @Amiram 我已经尝试过了,但它会产生相同的错误:` sumifs() missing 1 required positional argument: 'result_col' `

标签: python multiprocessing pool args keyword-argument


【解决方案1】:

让我们看一下您的代码的两部分。

首先是sumifs函数声明:

def sumifs(df, result_col, **kwargs):

其次,使用相关参数调用这个函数。

# Those are the params
ca = read_in_table('Tab1')
keywords = {'Z': base['Consumer archetype ID']}

# This is the function call
results = pool.map(partial(sumifs, a=ca, kwargs=keywords), tasks)

更新 1:

原代码修改后,貌似是位置参数赋值的问题,尝试丢弃。

换行:

results = pool.map(partial(sumifs, a=ca, kwargs=keywords), result_col)

与:

results = pool.map(partial(sumifs, ca, **keywords), result_col)

示例代码:

import multiprocessing
from functools import partial

def test_func(arg1, arg2, **kwargs):
    print(arg1)
    print(arg2)
    print(kwargs)
    return arg2

if __name__ == '__main__':
    list_of_args2 = [1, 2, 3]
    just_a_dict = {'key1': 'Some value'}
    with multiprocessing.Pool(processes=3) as pool:
        results = pool.map(partial(test_func, 'This is arg1', **just_a_dict), list_of_args2)
    print(results)

将输出:

This is arg1
1
{'key1': 'Some value'}
This is arg1
2
{'key1': 'Some value'}
This is arg1
2
{'key1': 'Some value'}
['1', '2', '3']

更多Multiprocessing.pool with a function that has multiple args and kwargs的例子


更新 2:

扩展示例(由于 cmets):

然而,我想知道,以同样的方式,如果我的函数有三个 args 和 kwargs,并且我想保持 arg1、arg3 和 kwargs 不变,我如何将 arg2 作为多处理列表传递?本质上,我将如何指示多处理 map(partial(test_func, 'This is arg1', 'This would be arg3', **just_a_dict), arg2) 第二个值对应于 arg3 而不是 arg2?

Update 1 代码将发生如下变化:

# The function signature
def test_func(arg1, arg2, arg3, **kwargs):

# The map call
pool.map(partial(test_func, 'This is arg1', arg3='This is arg3', **just_a_dict), list_of_args2)

这可以使用python 位置和关键字赋值来完成。 请注意,kwargs 被放在一边,并且没有使用 keyword 进行分配,尽管它位于 keyword 分配值之后。

有关参数分配差异的更多信息可以找到here

【讨论】:

  • 嗨@Amiram,我刚刚意识到我忘了提到tasks 是我的result_col 参数!因此麻烦...不过非常感谢! p.s.:我刚刚编辑了这个问题,这样就更清楚了!
  • 您还有问题吗?
  • 是的,仍然没有弄清楚如何将第一个参数和 kwargs(它们是常量)传递给函数,并将第二个参数作为多处理的可迭代对象。
  • 但是我想知道,以同样的方式,如果我的函数有三个 args 和 kwargs,并且我想保持 arg1、arg3 和 kwargs 不变,我怎么能将 arg2 作为多处理列表传递?本质上,我将如何指示 map(partial(test_func, 'This is arg1', 'This would be arg3', **just_a_dict), arg2) 部分中的第二个值对应于 arg3 而不是 arg2 的多处理?
  • 我已经对原始答案添加了另一个更新。
【解决方案2】:

如果有一条数据在所有工作/作业中是恒定的/固定的,那么最好在创建池期间使用此固定数据“初始化”池中的进程并映射变化的数据.这避免了在每个作业请求中重新发送固定数据。在您的情况下,我会执行以下操作:

df = None
kw = {}

def initialize(df_in, kw_in):
    global df, kw
    df, kw = df_in, kw_in

def worker(data):
    # computation involving df, kw, and data
    ...

...
    with multiprocessing.Pool(max_number_processes, intializer, (base, keywords)) as pool:
        pool.map(worker, varying_data)

gist 包含使用初始化程序的完整示例。这个blog post 解释了使用初始化程序的性能提升。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2018-02-15
    • 1970-01-01
    • 2020-09-16
    • 2017-01-28
    • 1970-01-01
    • 2021-07-26
    • 2022-09-22
    • 1970-01-01
    相关资源
    最近更新 更多