【问题标题】:Pandas: Time-weighted rolling average on ragged time seriesPandas:不规则时间序列的时间加权滚动平均值
【发布时间】:2020-08-20 17:38:38
【问题描述】:

我有一个参差不齐(意思是非常规频率)、时间索引的 DataFrame,我想对其执行时间加权滚动平均,以保持 DataFrame 的原始索引。假定记录的值在被另一个值取代之前是有效的。实现这一点的一种方法是将参差不齐的 DataFrame 上采样到统一频率,然后进行滚动平均:

import pandas as pd
import numpy as np


def time_weighted_average_using_upsampling(df: pd.DataFrame, avg_window: str) -> pd.DataFrame:
    # Leads to high memory usage
    original_index = df.index.copy()
    avg = (
        df.resample("1s")
        .ffill()
        .rolling(avg_window, closed="left", min_periods=int(avg_window[0])))
        .mean()
        .reindex(original_index)
    )
    return avg


if __name__ == "__main__":
    df = pd.DataFrame(
        {"A": [0, 1, 2, 3, 4, 5]},
        index=[
            pd.Timestamp("20130101 09:00:00"),
            pd.Timestamp("20130101 09:00:02"),
            pd.Timestamp("20130101 09:00:03"),
            pd.Timestamp("20130101 09:00:05"),
            pd.Timestamp("20130101 09:00:06"),
            pd.Timestamp("20130101 09:00:10"),
        ],
    )

    expected_avg = pd.DataFrame(
        {"A": [np.nan, np.nan, 1 / 3, 5 / 3, 7 / 3, 4]},
        index=[
            pd.Timestamp("20130101 09:00:00"),
            pd.Timestamp("20130101 09:00:02"),
            pd.Timestamp("20130101 09:00:03"),
            pd.Timestamp("20130101 09:00:05"),
            pd.Timestamp("20130101 09:00:06"),
            pd.Timestamp("20130101 09:00:10"),
        ],
    )

    pd.testing.assert_frame_equal(
        time_weighted_average_using_upsampling(df=df, avg_window="3s"), expected_avg
    )

这个问题是上采样破坏了参差不齐的 df 提供的稀疏表示的目的。稀疏表示是内存高效的,而上采样版本则不是。这就引出了一个问题:如何在不必对整个df进行上采样的情况下获得上面显示的结果?

【问题讨论】:

  • 这是一个有趣的问题,如果您的问题是不立即对整个数据帧进行上采样,那么也许您可以考虑仅对最后 3 行进行上采样并像 df.rolling(3, closed='left').apply(lambda x: x.resample("1s").ffill()[-4:-1].mean()) 一样进行计算,但这赢了我猜这不是很有效,因为你在最后执行了许多上采样
  • 我也考虑过这种方法。它将处理内存限制,但存在实际挑战,其中一个我不知道如何解决。 df.rolling().apply() 除了样本数组之外不提供可调用的任何内容,因此您不能使用 df 操作。
  • 也许我误解了你说的数组,但是df.rolling().apply() 确实将一个系列传递给它,而不是现在的一个 numpy 数组。参数raw has been added in v0.23,似乎在这个早期版本中默认设置为True,但由于似乎默认是False,它传递了一个Serie,因此索引可用
  • 你完全正确!我的错。我使用的是稍旧版本的 pandas。

标签: python pandas dataframe rolling-computation


【解决方案1】:

这是一种替代方法,您可以先检查两行之间的时间差异大于间隙的位置,而不是对整个数据帧进行上采样。然后将 3s 删除到有间隙的行和 reindex df 以及这些特定新时间戳的联合。创建这些行后,您可以 groupby 使用添加新索引的位置,resample 每组 1s,最后 rolling 使用您所做的方法。 Reindex 以 df 结尾。

rule = 3
rolling_win = f'{rule}s'

sparse = df.index.to_series().diff().dt.total_seconds().ge(rule)
new_timestamps = df.index[sparse] - pd.Timedelta(seconds=rule)
print(new_timestamps) 
#DatetimeIndex(['2013-01-01 09:00:07'], dtype='datetime64[ns]', freq=None)

#reindex with the new 
df_ = df.reindex(df.index.union(new_timestamps))

#perform first the resample 1s per group, then clean the dataframe to do the rolling.mean
#finally reindex like original df
df_ = (df_.groupby(df_.index.isin(new_timestamps).cumsum())
          .resample("1s").ffill()
          .reset_index(level=0, drop=True).ffill()
          .rolling(rolling_win, closed="left", min_periods=rule)\
          .mean()
          .reindex(df.index)
      )
print(df_)
                            A
2013-01-01 09:00:00       NaN
2013-01-01 09:00:02       NaN
2013-01-01 09:00:03  0.333333
2013-01-01 09:00:05  1.666667
2013-01-01 09:00:06  2.333333
2013-01-01 09:00:10  4.000000

在这种情况下,它并不是很有趣,因为差距实际上很小,但如果差距很大,那么它就会变得有用。

EDIT 或其他选项,可能更好,union 所有由您删除 1s、2s、3s、...(取决于规则)的原始索引创建的索引。现在您只有滚动所需的索引,所以reindexffillrolling.mean。最后结果相同

from functools import reduce

rule = 3
rolling_win = f'{rule}s'

idx = df.index
df_ = (df.reindex(reduce(lambda x, y: x.union(y), 
                         [idx - pd.Timedelta(seconds=i) 
                          for i in range(0, rule+1)]))
         .ffill()
         .rolling(rolling_win, closed="left", min_periods=rule)\
         .mean()
         .reindex(df.index)
        )

【讨论】:

  • 我昨天根据您的 cmets 开发的另一种方法:python def time_weighted_average_using_local_upsampling(df: pd.DataFrame, avg_window: str) -> pd.DataFrame: original_index = df.index.copy() avg = ( df.reindex(df.index.union(df.index.shift(periods=-1, freq=avg_window)), method="ffill") .rolling(avg_window, closed="both", min_periods=2) .apply(lambda x: x.resample("1s").ffill()[:-1].mean(skipna=False)) .reindex(original_index) ) return avg
【解决方案2】:

受@Ben.T 启发的两种可能的解决方案:

def time_weighted_average_using_local_upsampling(df: pd.DataFrame, avg_window: str) -> pd.DataFrame:
    """Uses second resolution up-sampling only on smaller windows at a time."""
    original_index = df.index.copy()
    avg = (
        df.reindex(df.index.union(df.index.shift(periods=-1, freq=avg_window)), method="ffill")
        .rolling(avg_window, closed="both", min_periods=2)
        .apply(lambda x: x.resample("1s").ffill()[:-1].mean(skipna=False))
        .reindex(original_index)
    )
    return avg


def time_weighted_average_using_index_weighting(df: pd.DataFrame, avg_window: str) -> pd.DataFrame:
    """Uses weighting by duration, by ensuring every window has a point at the start."""
    original_index = df.index.copy()
    avg = (
        df.reindex(df.index.union(df.index.shift(periods=-1, freq=avg_window)), method="ffill")
        .rolling(avg_window, closed="both", min_periods=2)
        .apply(lambda x: np.average(x[:-1], weights=x.index.to_series().diff()[1:].dt.seconds))
        .reindex(original_index)
    )
    return avg

第一个对单个滚动窗口进行一次上采样,而后者实际上通过确保在我们关心的窗口开始处始终有一个可用点来进行不规则时间加权平均。这是通过包含按窗口长度移动的原始索引来完成的。

我还没有衡量相关案例的表现。

编辑: 我决定在大约 100,000 行的第二个分辨率数据集上测试这些函数,并使用 20 分钟的窗口(!)这两种变体都慢得令人难以忍受,但我认为我有一个新的赢家:

def time_weighted_average_using_index_weighting2(df: pd.DataFrame, avg_window: str) -> pd.DataFrame:
    """Uses weighting by duration, by ensuring every window has a point at the start."""
    original_index = df.index.copy()
    avg = df.reindex(df.index.union(df.index.shift(periods=-1, freq=avg_window)), method="ffill")
    avg = (
        avg.multiply(avg.index.to_series().diff().dt.seconds.shift(-1), axis=0)
        .divide(pd.Timedelta(avg_window).seconds)
        .rolling(avg_window, closed="left")
        .sum()
        .reindex(original_index)
    )
    avg[~((avg.index - pd.Timedelta(avg_window)) >= original_index[0])] = np.nan
    return avg

这个在滚动之前预先进行加权,因此我们使用.sum()而不是apply()。这意味着速度的巨大提升。无论平均窗口的大小如何,我们最多也可以将索引翻倍。

【讨论】:

  • 有意思,有时间我试试看:)
猜你喜欢
  • 2014-12-08
  • 1970-01-01
  • 2014-02-17
  • 1970-01-01
  • 2012-06-06
  • 1970-01-01
  • 2019-03-30
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多