【问题标题】:PySpark Throwing error Method __getnewargs__([]) does not existPySpark 抛出错误方法 __getnewargs__([]) 不存在
【发布时间】:2020-02-12 23:49:16
【问题描述】:

我有一组文件。文件的路径保存在一个文件中。比如all_files.txt。使用 apache spark,我需要对所有文件进行操作并获得结果。

我想做的步骤是:

  • 通过读取all_files.txt 创建一个RDD
  • 对于all_files.txt 中的每一行(每一行都是某个文件的路径), 将每个文件的内容读入单个 RDD
  • 然后对所有内容做一个操作

这是我写的代码:

def return_contents_from_file (file_name):
    return spark.read.text(file_name).rdd.map(lambda  r: r[0])

def run_spark():
    file_name = 'path_to_file'

    spark = SparkSession \
        .builder \
        .appName("PythonWordCount") \
        .getOrCreate()

    counts = spark.read.text(file_name).rdd.map(lambda r: r[0]) \ # this line is supposed to return the paths to each file
        .flatMap(return_contents_from_file) \ # here i am expecting to club all the contents of all files
        .flatMap(do_operation_on_each_line_of_all_files) # here i am expecting do an operation on each line of all files

这是抛出错误:

第 323 行,在 get_return_value py4j.protocol.Py4JError 中:错误 调用 o25.getnewargs 时发生。跟踪:py4j.Py4JException: 方法 getnewargs([]) 不存在于 py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318) 在 py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326) 在 py4j.Gateway.invoke(Gateway.java:272) 在 py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) 在 py4j.commands.CallCommand.execute(CallCommand.java:79) 在 py4j.GatewayConnection.run(GatewayConnection.java:214) 在 java.lang.Thread.run(Thread.java:745)

谁能告诉我我做错了什么以及我应该如何进一步处理。提前致谢。

【问题讨论】:

    标签: python apache-spark pyspark flatmap


    【解决方案1】:

    不允许在 flatMap 内使用 spark 或在执行程序上发生的任何转换(spark 会话仅在驱动程序上可用)。也不可能创建 RDD 的 RDD(参见:Is it possible to create nested RDDs in Apache Spark?

    但是您可以通过另一种方式实现这种转换 - 将 all_files.txt 的所有内容读入数据帧,使用 local map 将它们设为数据帧和 local @987654327 @ 联合所有,见例子:

    >>> filenames = spark.read.text('all_files.txt').collect()
    >>> dataframes = map(lambda r: spark.read.text(r[0]), filenames)
    >>> all_lines_df = reduce(lambda df1, df2: df1.unionAll(df2), dataframes)
    

    【讨论】:

    • 感谢您的回复。但是我如何并行化整个过程呢? map(lambda r: spark.read.text(r[0]), filenames) 不会序列化整个过程吗?
    • 读取文件的过程是并行运行的,唯一序列化的部分是构建执行计划。试试看!
    【解决方案2】:

    今天遇到这个问题,终于发现我在pandas_udf中引用了一个spark.DataFrame对象,导致这个错误。

    结论:

    您不能在udfpandas_udf 中使用sparkSession 对象、spark.DataFrame 对象或其他Spark 分布式对象,因为它们是未腌制的。

    如果您遇到这个错误并且您使用的是udf,请仔细检查,一定是相关问题。

    【讨论】:

    • 那你的解决方案是什么?
    • @Rock 仅确保在 udfpandas_udf 中没有 Spark 分布式对象
    • 同样,当我不小心从 pandas_udf 返回了一个 spark 数据帧时,我弹出了这个错误。
    【解决方案3】:

    当模型本身是 pyspark.ml.classification 模型时,我在尝试使用 mlflow.sklearn.log_model 使用 MLFlow 记录我的模型时也遇到了这个错误。使用mlflow.spark.log_model 解决了这个问题。

    【讨论】:

      猜你喜欢
      • 2017-04-12
      • 1970-01-01
      • 2017-07-11
      • 2018-02-27
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-11-27
      • 2019-01-14
      相关资源
      最近更新 更多