【问题标题】:Find the min and max value of second dataframe between 2 dates given by first dataframe在第一个数据框给出的两个日期之间查找第二个数据框的最小值和最大值
【发布时间】:2020-07-04 19:12:58
【问题描述】:

我有这 2 个虚拟数据帧

np.random.seed(12345)

df1=pd.DataFrame({'name'    :  ['A']*4+['B']*4,
                'start_date':   pd.to_datetime(['2000-03-15', '2000-06-12','2000-09-01', '2001-01-17','2000-03-19', '2000-06-14','2000-09-14', '2001-01-22']),
                'end_date':pd.to_datetime(['2000-06-12','2000-09-01', '2001-01-17','2001-03-19', '2000-06-14','2000-09-14', '2001-01-22','2001-02-01'])})

date=pd.date_range('2000-01-01','2002-01-01')
name=['A']*len(date)+['B']*len(date)
date=date.append(date)
import numpy as np
low=np.random.rand(len(date))
high=low+np.random.rand(len(date))
df2=pd.DataFrame({'name': name, 'date': date, 'low':low,'high':high})

对于 df1 中的每一行,我都有名字、开始日期和结束日期。

我想找到与名称相同且在 df2 中的开始日期和结束日期之间的最大值和最小值

以下是我目前的解决方案。

df1=df1.set_index('name')
df2=df2.set_index(['name','date'])
df2=df2.sort_index()
df1['max']=-1
df1['min']=-1
for name in df1.index.unique():
    df=df2.loc[name]
    tmphigh=[]
    tmplow=[]
    for (_,start_date,end_date,_,_) in df1.loc[name].itertuples(name=None):
        newdf=df.iloc[df.index.searchsorted(start_date): df.index.searchsorted(end_date)]
        tmphigh.append(newdf.high.max())
        tmplow.append(newdf.low.min())
    df1.loc[[name],['max']]=tmphigh
    df1.loc[[name],['min']]=tmplow

但是,应用超过百万行仍然需要相当长的时间。 我想知道是否有更快的方法。

[编辑]: 感谢 Pramote Kuacharoen,我能够调整他的一些代码,并在现有代码的基础上实现 6 倍的加速。

分离成循环的原因是我发现在apply函数中包含df2[name]的生成会导致计算时间显着增加。

因此我分开计算它可能有助于减少函数调用以提取 df2 中 name 下的所有值。

如果有人能提出比我的方法更好的方法,我会很高兴。但这对我来说已经足够了。

以下是我目前的解决方案

from tqdm import tqdm
df1a=df1.groupby('name')
df2a=df2.groupby('name')
mergedf=df1
mergedf['maximum']=-1
mergedf['minimum']=-1
def get_min_max(row):
    dfx=df2x.iloc[df2x.index.searchsorted(row['start_date']): df2x.index.searchsorted(row['end_date'])]
    maximum = dfx['high'].max()
    minimum = dfx['low'].min() 
    return pd.Series({'maximum': maximum, 'minimum': minimum})
for name,df in tqdm(df1a):
    df2x=df2a.get_group(name)
    mergedf.loc[[name],['maximum','minimum']]=df.apply(get_min_max,axis=1)

【问题讨论】:

    标签: python pandas dataframe


    【解决方案1】:
    import pandas as pd
    df1=pd.DataFrame({'name'    :  ['A']*4+['B']*4,
                    'start_date':   pd.to_datetime(['2000-03-15', '2000-06-12','2000-09-01', '2001-01-17','2000-03-19', '2000-06-14','2000-09-14', '2001-01-22']),
                    'end_date':pd.to_datetime(['2000-06-12','2000-09-01', '2001-01-17','2001-03-19', '2000-06-14','2000-09-14', '2001-01-22','2001-02-01'])})
    
    date=pd.date_range('2000-01-01','2002-01-01')
    name=['A']*len(date)+['B']*len(date)
    date=date.append(date)
    import numpy as np
    low=np.random.rand(len(date))
    high=low+np.random.rand(len(date))
    df2=pd.DataFrame({'name': name, 'date': date, 'low':low,'high':high})
    
    df2 = df2.set_index('date')
    
    def find_max(row):
        return df2[df2['name'] == row['name']].loc[row['start_date']:row['end_date'], 'high'].max()
    
    def find_min(row):
        return df2[df2['name'] == row['name']].loc[row['start_date']:row['end_date'], 'low'].min()
    
    df1['maximum'] = df1.apply(find_max, axis=1)
    df1['minimum'] = df1.apply(find_min, axis=1)
    

    尝试在一次调用中找到最小值和最大值。它可能会节省一些时间。

    def find_min_max(row):
        dfx = df2[df2['name'] == row['name']].loc[row['start_date']:row['end_date'], ['high', 'low']]
        maximum = dfx['high'].max()
        minimum = dfx['low'].min()
        return pd.Series({'maximum': maximum, 'minimum': minimum})
    
    df1.merge(df1.apply(find_min_max, axis=1), left_index=True, right_index=True)
    

    试试这个:多处理和共享内存。将其保存在 .py 文件中并使用命令行运行它。它应该快得多。我将 n_workers 设置为 4。您可以更改它。

    import numpy as np
    import pandas as pd
    from multiprocessing.shared_memory import SharedMemory
    from concurrent.futures import ProcessPoolExecutor, as_completed
    
    
    def find_min_max(name, data_info):
    
        shm_name, shape, dtype = data_info[0]
        shm1 = SharedMemory(shm_name)
        np1 = np.recarray(shape=shape, dtype=dtype, buf=shm1.buf)
    
        shm_name, shape, dtype = data_info[1]
        shm2 = SharedMemory(shm_name)
        np2 = np.recarray(shape=shape, dtype=dtype, buf=shm2.buf)
    
        data1 = np1[np1['name'] == name]
        data2 = np2[np2['name'] == name]
    
        for rec in data1:
            idx1 = np.searchsorted(data2['date'], rec['start_date'])
            idx2 = np.searchsorted(data2['date'], rec['end_date'])
            data = data2[idx1:idx2]
            np1[rec['index']]['maximum'] = data['high'].max()
            np1[rec['index']]['minimum'] = data['low'].min()
    
    
    def main():
    
        np.random.seed(12345)
    
        df1 = pd.DataFrame({'name':  ['A']*4+['B']*4,
                            'start_date':   pd.to_datetime(['2000-03-15', '2000-06-12', '2000-09-01', '2001-01-17', '2000-03-19', '2000-06-14', '2000-09-14', '2001-01-22']),
                            'end_date': pd.to_datetime(['2000-06-12', '2000-09-01', '2001-01-17', '2001-03-19', '2000-06-14', '2000-09-14', '2001-01-22', '2001-02-01'])})
    
        date = pd.date_range('2000-01-01', '2002-01-01')
        name = ['A']*len(date)+['B']*len(date)
        date = date.append(date)
        low = np.random.rand(len(date))
        high = low+np.random.rand(len(date))
        df2 = pd.DataFrame({'name': name, 'date': date, 'low': low, 'high': high})
    
        df1 = df1.sort_values('name')
        df2 = df2.sort_values(['name', 'date'])
        df1['maximum'] = -1.0
        df1['minimum'] = -1.0
    
        np1 = df1.to_records(column_dtypes={
            'name': '|S20', 'start_date': '<M8[ns]', 'end_date': '<M8[ns]'})
        np2 = df2.to_records(column_dtypes={
            'name': '|S20', 'date': '<M8[ns]', 'low': '<f8', 'high': '<f8'})
    
        names = [str.encode(name) for name in df1['name'].unique()]
        del df1
        del df2
    
        shm1 = SharedMemory(name='d1', create=True, size=np1.nbytes)
        shm2 = SharedMemory(name='d2', create=True, size=np2.nbytes)
    
        shm1_np_array = np.recarray(
            shape=np1.shape, dtype=np1.dtype, buf=shm1.buf)
        np.copyto(shm1_np_array, np1)
        shm2_np_array = np.recarray(
            shape=np2.shape, dtype=np2.dtype, buf=shm2.buf)
        np.copyto(shm2_np_array, np2)
    
        data_info = [
            (shm1.name, np1.shape, np1.dtype),
            (shm2.name, np2.shape, np2.dtype)
        ]
    
        del np1
        del np2
    
        # Set number of workers
        n_workers = 4
    
        with ProcessPoolExecutor(n_workers) as exe:
            fs = [exe.submit(find_min_max, name, data_info)
                  for name in names]
            for _ in as_completed(fs):
                pass
    
        print(shm1_np_array)
    
        shm1.close()
        shm2.close()
        shm1.unlink()
        shm2.unlink()
    
    
    if __name__ == "__main__":
        main()
    

    【讨论】:

    • 对不起,这个方法的速度似乎比我幼稚的循环慢……我创建的循环能够在一小时内完成。但是我已经运行了一个多小时的 apply 方法,它仍然没有完成。不确定我是否做错了什么......但我认为这可能是由于应用程序在循环提取该操作时为每行迭代调用掩码。
    • 您的数据有多大?
    • df1 有 50 万行,df2 有 3700 万行
    • 只尝试 df1.apply(find_min_max, axis=1) 看看运行需要多长时间。
    • 您可以考虑的其他选项:对数据进行分区,使用多处理。
    【解决方案2】:

    由于性能是您的问题,我认为 dask 可以提供很大帮助

    import pandas as pd
    import numpy as np
    import dask.dataframe as dd
    

    创建 dask df

    ddf1 = dd.from_pandas(df1, npartitions=5) # You can change 5 to higher
    

    你的逻辑函数

    def get_high_low(name, start, end):
        mask = df2['name'].eq(name) & df2['date'].between(start, end)
        low = df2.loc[mask]['low'].min()
        high = df2.loc[mask]['high'].max()
        return {'name': name, 'start_date': start, 'end_date': end, 'max': high, 'min': low}
    

    将结果数据收集到一个新的数据帧中

    result = ddf1.apply(lambda x: testing(x['name'], x['start_date'], x['end_date']), axis=1, meta=(None, 'object')).compute()
    df4 = pd.DataFrame(result.tolist())
    

    【讨论】:

    • 感谢您介绍 dask.dask 似乎符合我的目的。但是对于这种情况,我无法通过您的回答让它在我的数据集上工作。我已在您的解决方案中将测试替换为 get_high_low 。我添加了诊断,但它没有执行任何任务。 ~~~ from dask.diagnostics import ProgressBar ProgressBar().register()
    • 您遇到错误了吗?我的解决方案适用于您提供的示例
    • 没有错误。当我将 df1 的大小从 1k 行和 df2 减少到 100 万行时,它工作正常。但更大会导致类似的加载时间增加,或者它只是没有进展
    • 这听起来像是硬件问题,你的电脑即使使用多重处理也无法处理这么大的文件
    猜你喜欢
    • 2021-03-25
    • 1970-01-01
    • 2019-02-26
    • 2021-09-23
    • 1970-01-01
    • 2016-07-18
    • 1970-01-01
    • 2019-08-28
    • 1970-01-01
    相关资源
    最近更新 更多