【问题标题】:dask dataframes -time series partitionsdask 数据帧 - 时间序列分区
【发布时间】:2024-08-01 08:05:02
【问题描述】:

我有一个时间序列 pandas 数据框,我想按月和年进行分区。我的想法是获取一个日期时间列表,作为索引,但中断不会发生在月初的 0:00 开始..

monthly_partitons=np.unique(df.index.values.astype('datetime64[M]')).tolist()
da=dd.from_pandas(df, npartitions=1)

如何将索引设置为从每个月开始?我试过npartitions=len(monthly_partitions),但我意识到这是错误的,因为它可能不会在开始时的日期分区。应该如何确保它在当月的第一个日期分区?

更新:

使用da=da.repartition(freq='1M')将数据从10分钟数据重新采样到1分钟数据见下文

Dask DataFrame Structure:
Open    High    Low Close   Vol OI  VI  
npartitions=5037050                             
2008-05-04 18:00:00 float64 float64 float64 float64 int64   int64   float64 int32
2008-05-04 18:01:00 ... ... ... ... ... ... ... ...
... ... ... ... ... ... ... ... ...
2017-12-01 16:49:00 ... ... ... ... ... ... ... ...
2017-12-01 16:50:00 ... ... ... ... ... ... ... ...
Dask Name: repartition-merge, 10074101 tasks

更新 2:

这是重现问题的代码

import pandas as pd
import datetime as dt
import dask as dsk
import numpy as np
import dask.dataframe as dd

ts=pd.date_range("2015-01-01 00:00", " 2015-05-01 23:50", freq="10min")
df = pd.DataFrame(np.random.randint(0,100,size=(len(ts),4)), columns=list('ABCD'), index=ts)
ddf=dd.from_pandas(df,npartitions=1)
ddf=ddf.repartition(freq='1M')
ddf

【问题讨论】:

    标签: pandas dask


    【解决方案1】:

    假设您的数据框已按时间编入索引,您应该能够使用 repartition method 来完成此操作。

    df = df.repartition(freq='1M')
    

    在上面的 MCVE 之后编辑

    (感谢您添加最小且完整的示例!)

    有趣的是,这看起来像是一个错误,无论是在 pandas 还是 dask 中。我假设'1M' 意味着一个月,(就像在pd.date_range 中一样)

    In [12]: pd.date_range('2017-01-01', '2017-12-15', freq='1M')
    Out[12]: 
    DatetimeIndex(['2017-01-31', '2017-02-28', '2017-03-31', '2017-04-30',
                   '2017-05-31', '2017-06-30', '2017-07-31', '2017-08-31',
                   '2017-09-30', '2017-10-31', '2017-11-30'],
                  dtype='datetime64[ns]', freq='M')
    

    然而,当传递给pd.Timedelta 时,它意味着一分钟

    In [13]: pd.Timedelta('1M')
    Out[13]: Timedelta('0 days 00:01:00')
    
    In [14]: pd.Timedelta('1m')
    Out[14]: Timedelta('0 days 00:01:00')
    

    所以它挂了,因为它试图创建的分区比你预期的要多 43200 个 :)

    我们应该为此提交错误报告(您有兴趣这样做吗?)。一个短期的解决方法是自己明确指定部门。

    In [17]: divisions = pd.date_range('2015-01-01', '2015-05-01', freq='1M').tolist
        ...: ()
        ...: divisions[0] = ddf.divisions[0]
        ...: divisions[-1] = ddf.divisions[-1]
        ...: ddf.repartition(divisions=divisions)
        ...: 
    Out[17]: 
    Dask DataFrame Structure:
                             A      B      C      D
    npartitions=3                                  
    2015-01-01 00:00:00  int64  int64  int64  int64
    2015-02-28 00:00:00    ...    ...    ...    ...
    2015-03-31 00:00:00    ...    ...    ...    ...
    2015-05-01 23:50:00    ...    ...    ...    ...
    Dask Name: repartition-merge, 7 tasks
    

    【讨论】:

    • 我试过那个方法它挂断了需要中断来停止进程。我设置分区数时没有这个问题。好吧,分区放错了地方,我仍然能够传递 DataFrame 并处理
    • 感谢 MCVE,它使识别和解决问题变得更加容易。在编辑中回答。
    • 感谢您的帮助,因为我怀疑它在 timedelta 中
    【解决方案2】:

    如果您想按每个月的第一天进行分区,请使用以下命令:

    ddf.repartition(freq='MS')
    

    其中MS 表示月份开始。更多DateOffset对象的信息可以在pandas docs中找到

    【讨论】: