【发布时间】:2025-12-02 06:20:05
【问题描述】:
我有大量相当大的日常文件存储在博客存储引擎(S3、Azure datalake exc.. exc..)data1900-01-01.csv, data1900-01-02.csv,....,data2017-04-27.csv。我的目标是执行滚动 N 天线性回归,但我在数据加载方面遇到了麻烦。如果没有嵌套 RDD's.,我不确定如何做到这一点
每个.csv 文件的架构都是相同的。
换句话说,对于每个日期d_t,我需要数据x_t 并加入数据(x_t-1, x_t-2,... x_t-N)。
如何使用 PySpark 加载这些日常文件的 N 天窗口?我能找到的所有 PySpark 示例似乎都是从一个非常大的文件或数据集加载的。
这是我当前代码的示例:
dates = [('1995-01-03', '1995-01-04', '1995-01-05'), ('1995-01-04', '1995-01-05', '1995-01-06')]
p = sc.parallelize(dates)
def test_run(date_range):
dt0 = date_range[-1] #get the latest date
s = '/daily/data{}.csv'
df0 = spark.read.csv(s.format(dt0), header=True, mode='DROPMALFORM')
file_list = [s.format(dt) for dt in date_range[:-1]] # Get a window of trailing dates
df1 = spark.read.csv(file_list, header=True, mode='DROPMALFORM')
return 1
p.filter(test_run)
p.map(test_run) #fails with same error as p.filter
我正在使用 PySpark 版本 '2.1.0'
我在 Azure HDInsight 集群 jupyter notebook 上运行它。
spark 这里是<class 'pyspark.sql.session.SparkSession'> 类型
一个更小更可重现的例子如下:
p = sc.parallelize([1, 2, 3])
def foo(date_range):
df = spark.createDataFrame([(1, 0, 3)], ["a", "b", "c"])
return 1
p.filter(foo).count()
【问题讨论】:
-
在下面查看我的更新答案
-
@Pushkr 我再次更新它以使其更清晰,删除窗口函数调用并放置日期示例。只是那个例子对我来说失败了,我可以通过直接调用
test_run(dates[0])来运行它 -
您是否尝试过使用
spark.read.csv(folder)将所有数据直接加载到Dataframe 中,然后添加一个名为(file_name).withColumn("filename", input_file_name())的新列然后只需使用基于此列的分组来进一步操作数据框可以进行 N 天线性回归吗? -
@Teodor-BogdanBarbieru 不,我没有尝试过,稍后会尝试,谢谢您的建议。
标签: csv pandas apache-spark pyspark