我也遇到过类似的问题,看来我找到了办法:
1.获取文件列表
2.parallelize这个列表(分布在所有节点之间)
3.编写一个函数,从分发到节点的大列表部分读取所有文件的内容
4.用mapPartition运行它,然后将结果收集为一个列表,每个元素都是每个文件的收集内容。
存储在 AWS s3 和 json 文件上的 Fot 文件:
def read_files_from_list(file_list):
#reads files from list
#returns content as list of strings, 1 json per string ['{}','{}',...]
out=[]
for x in file_list:
content = sp.check_output([ 'aws', 's3', 'cp', x, '-']) # content of the file. x here is a full path: 's3://bucket/folder/1.json'
out.append(content)
return out #content of all files from the file_list as list of strings, 1 json per string ['{}','{}',...]
file_list=['f1.json','f2.json',...]
ps3="s3://bucket/folder/"
full_path_chunk=[ps3 + f for f in file_list] #makes list of strings, with full path for each file
n_parts = 100
rdd1 = sc.parallelize(full_path_chunk, n_parts ) #distribute files among nodes
list_of_json_strings = rdd1.mapPartitions(read_files_from_list).collect()
然后,如果需要,您可以像这样创建 spark 数据框:
rdd2=sc.parallelize(list_of_json_strings) #this is a trick! via http://spark.apache.org/docs/latest/sql-programming-guide.html#json-datasets
df_spark=sqlContext.read.json(rdd2)
函数read_files_from_list只是一个例子,应该改为使用python工具从hdfs读取文件。
希望这会有所帮助:)