【问题标题】:Speed up or vectorize pandas apply function - require a conditional application of a function加速或矢量化 pandas 应用函数 - 需要有条件地应用函数
【发布时间】:2018-09-19 16:25:37
【问题描述】:

我想将函数逐行应用于如下所示的数据框:

name  value 

'foo' 2
'bar' 4
'bar' 3
'foo' 1
  .   .
  .   .
  .   .
'bar' 8

速度对我来说很重要,因为我在多个 90GB 数据集上进行操作,因此我一直在尝试对以下操作进行矢量化以在 df.apply 中使用:

以“名称”为条件,我想将“值”插入一个单独的函数,对结果执行一些算术运算,然后写入一个新列“输出”。类似的,

funcs = {'foo': <FunctionObject>, 'bar': <FunctionObject>}

def masterFunc(row):
    correctFunction = funcs[row['name']]
    row['output'] = correctFunction(row['value']) + 3*row['value']

df.apply(masterFunc, axis=1).

在我真正的问题中,我有 32 个不同的函数可以根据“名称”应用于“值”。这些单独的函数(fooFunc、barFunc、zooFunc 等)中的每一个都已经被矢量化了;它们是这样构建的 scipy.interp1d 函数:

separateFunc = scipy.interpolate.interp1d(x-coords=[2, 3, 4], y-coords=[3, 5, 7])
#separateFunc is now a math function, y=2x-1. use case:
y = separateFunc(3.5) # y == 6

但是,我不确定如何对 masterFunc 本身进行矢量化。似乎选择将哪个函数“拉出”以应用于“值”非常昂贵,因为它需要在每次迭代时进行内存访问(使用我当前将函数存储在哈希表中的方法)。然而,替代方案似乎只是一堆 if-then 语句,这似乎也无法向量化。如何加快速度?

为简洁起见,删除了重复部分的实际代码:

interpolationFunctions = {}
#the 'interpolate.emissionsFunctions' are a separate function which does some scipy stuff
interpolationFunctions[2] = interpolate.emissionsFunctions('./roadtype_2_curve.csv')
interpolationFunctions[3] = interpolate.emissionsFunctions('./roadtype_3_curve.csv')

def compute_pollutants(row):
    funcs = interpolationFunctions[row['roadtype']]
    speed = row['speed']
    length = row['length']
    row['CO2-Atm'] = funcs['CO2-Atm'](speed)*length*speed*0.00310686368
    row['CO2-Eq'] = funcs['CO2-Eq'](speed)*length*speed*0.00310686368
    return row

【问题讨论】:

  • 通过像这样保持函数分离,除了使用 apply 之外几乎没有什么可做的。但是,根据功能的不同,您可以通过numba 使用jit 执行某些操作。如果您愿意分享实际功能,您应该这样做。您也可以 cythonize,但同样,这取决于功能(我认为)。甚至分享其中的 2 或 3 个,以便我们进行演示。
  • 嗨@piRSquared 感谢您的回复!我的单独函数是 scipy 插值函数(在原始问题中添加)。我可以用它做些什么吗?
  • 我没有看到任何功能。
  • 抱歉,刚刚添加完毕。每个单独的Func 都是根据预先存在的数据预先构建的; scipy.interpolate 将它们作为函数对象返回。
  • 你所有的功能都是这样的吗?再给我看几张。对于通用函数对象,您可能不会得到一个简单的答案。但是我们也许可以对函数本身做一些事情。我当然可以重写一个插值函数。问题是,我们能否写出一个动态的。也许,我需要了解更多您的其他功能。

标签: python pandas scipy python-multiprocessing data-science


【解决方案1】:

尝试创建一个可重现的示例来概括您的问题。您可以运行具有不同行大小的代码来比较不同方法之间的结果,将这些方法之一扩展到使用 cython 或多处理以获得更快的速度也不难。您提到您的数据非常大,我没有测试每种方法的内存使用情况,因此值得在您自己的机器上尝试。

import numpy as np
import pandas as pd
import time as t

# Example Functions
def foo(x):
    return x + x

def bar(x):
    return x * x

# Example Functions for multiple columns
def foo2(x, y):
    return x + y

def bar2(x, y):
    return x * y

# Create function dictionary
funcs = {'foo': foo, 'bar': bar}
funcs2 = {'foo': foo2, 'bar': bar2}

n_rows = 1000000
# Generate Sample Data
names = np.random.choice(list(funcs.keys()), size=n_rows)
values = np.random.normal(100, 20, size=n_rows)
df = pd.DataFrame()
df['name'] = names
df['value'] = values

# Create copy for comparison using different methods
df_copy = df.copy()

# Modified original master function
def masterFunc(row, functs):
    correctFunction = funcs[row['name']]
    return correctFunction(row['value']) + 3*row['value']

t1 = t.time()
df['output'] = df.apply(lambda x: masterFunc(x, funcs), axis=1)
t2 = t.time()
print("Time for all rows/functions: ", t2 - t1)


# For Functions that Can be vectorized using numpy
t3 = t.time()
output_dataframe_list = []
for func_name, func in funcs.items():
    df_subset = df_copy.loc[df_copy['name'] == func_name,:]
    df_subset['output'] = func(df_subset['value'].values) + 3 * df_subset['value'].values
    output_dataframe_list.append(df_subset)

output_df = pd.concat(output_dataframe_list)

t4 = t.time()
print("Time for all rows/functions: ", t4 - t3)


# Using a for loop over numpy array of values is still faster than dataframe apply using
t5 = t.time()
output_dataframe_list2 = []
for func_name, func in funcs2.items():
    df_subset = df_copy.loc[df_copy['name'] == func_name,:]
    col1_values = df_subset['value'].values
    outputs = np.zeros(len(col1_values))
    for i, v in enumerate(col1_values):
        outputs[i] = func(col1_values[i], col1_values[i]) + 3 * col1_values[i]

    df_subset['output'] = np.array(outputs)
    output_dataframe_list2.append(df_subset)

output_df2 = pd.concat(output_dataframe_list2)

t6 = t.time()
print("Time for all rows/functions: ", t6 - t5)

【讨论】:

    猜你喜欢
    • 2015-01-30
    • 2014-11-06
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-12-04
    • 2020-08-03
    • 2016-11-07
    • 2017-04-06
    相关资源
    最近更新 更多