【发布时间】:2020-02-12 23:49:16
【问题描述】:
我有一组文件。文件的路径保存在一个文件中。比如all_files.txt。使用 apache spark,我需要对所有文件进行操作并获得结果。
我想做的步骤是:
- 通过读取
all_files.txt创建一个RDD - 对于
all_files.txt中的每一行(每一行都是某个文件的路径), 将每个文件的内容读入单个 RDD - 然后对所有内容做一个操作
这是我写的代码:
def return_contents_from_file (file_name):
return spark.read.text(file_name).rdd.map(lambda r: r[0])
def run_spark():
file_name = 'path_to_file'
spark = SparkSession \
.builder \
.appName("PythonWordCount") \
.getOrCreate()
counts = spark.read.text(file_name).rdd.map(lambda r: r[0]) \ # this line is supposed to return the paths to each file
.flatMap(return_contents_from_file) \ # here i am expecting to club all the contents of all files
.flatMap(do_operation_on_each_line_of_all_files) # here i am expecting do an operation on each line of all files
这是抛出错误:
第 323 行,在 get_return_value py4j.protocol.Py4JError 中:错误 调用 o25.getnewargs 时发生。跟踪:py4j.Py4JException: 方法 getnewargs([]) 不存在于 py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318) 在 py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326) 在 py4j.Gateway.invoke(Gateway.java:272) 在 py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) 在 py4j.commands.CallCommand.execute(CallCommand.java:79) 在 py4j.GatewayConnection.run(GatewayConnection.java:214) 在 java.lang.Thread.run(Thread.java:745)
谁能告诉我我做错了什么以及我应该如何进一步处理。提前致谢。
【问题讨论】:
标签: python apache-spark pyspark flatmap