步骤如下
- 使用 sparkcontext.wholeTextFiles("/path/to/folder/ contains/all/files")
- 上面返回一个RDD,其中key是文件的路径,value是文件的内容
- rdd.map(lambda x:x[1]) - 这给你一个只有文件内容的rdd
- rdd.map(lambda x: customeFunctionToProcessFileContent(x))
- 由于 map 函数是并行工作的,因此您执行的任何操作都会更快且不连续 - 只要您的任务不相互依赖,这是并行性的主要标准
import os
import time
import pyspark
from pyspark.sql.functions import udf
from pyspark.sql.types import *
# reading all the files to create PairRDD
input_rdd = sc.wholeTextFiles("file:///home/user/datatest/*",2)
#convert RDD to DF
input_df=spark.createDataFrame(input_rdd)
input_df.show(truncate=False)
'''
+---------------------------------------+------------+
|_1 |_2 |
+---------------------------------------+------------+
|file:/home/user/datatest/test.txt |1,2,3 1,2,3|
|file:/home/user/datatest/test.txt1 |4,5,6 6,7,6|
+---------------------------------------+------------+
'''
input_df.select("_2").take(2)
#[Row(_2=u'1,2,3\n1,2,3\n'), Row(_2=u'4,5,6\n6,7,6\n')]
# function to get a creation time of a file
def time_convesion(filename):
return time.ctime(os.path.getmtime(filename.split(":")[1]))
#udf registration
time_convesion_udf = udf(time_convesion, StringType())
#udf apply over the DF
final_df = input_df.withColumn("created_time", time_convesion_udf(input_df['_1']))
final_df.show(2,truncate=False)
'''
+---------------------------------------+------------+------------------------+
|_1 |_2 |created_time |
+---------------------------------------+------------+------------------------+
|file:/home/user/datatest/test.txt |1,2,3 1,2,3|Sat Jul 11 18:31:03 2020|
|file:/home/user/datatest/test.txt1 |4,5,6 6,7,6|Sat Jul 11 18:32:43 2020|
+---------------------------------------+------------+------------------------+
'''
# proceed with the next steps for the implementation
上述方法适用于默认分区。所以你可能不会得到输入文件数等于输出文件数(因为输出是分区数)。
您可以根据计数或基于您的数据的任何其他唯一值对 RDD 重新分区,因此最终输出文件计数等于输入计数。这种方法仅具有并行性,但不会达到最佳分区数的性能