【问题标题】:Failed to execute user defined function on a dataframe in Spark (Scala)无法在 Spark(Scala)中的数据帧上执行用户定义的函数
【发布时间】:2019-08-22 18:31:23
【问题描述】:

我有一个数据框 df,如下所示

+--------+--------------------+--------+------+
|      id|                path|somestff| hash1|
+--------+--------------------+--------+------+
|       1|/file/dirA/fileA.txt|      58| 65161|
|       2|/file/dirB/fileB.txt|      52| 65913|
|       3|/file/dirC/fileC.txt|      99|131073|
|       4|/file/dirF/fileD.txt|      46|196233|
+--------+--------------------+--------+------+

注意:/file/dir 不同。并非所有文件都存储在同一目录中。事实上,不同的目录中有数百个文件。

我在这里要完成的是读取列路径中的文件并计算文件中的记录并将行计数的结果写入数据帧的新列。

我尝试了以下函数和udf:

def executeRowCount(fileCount: String): Long = {
  val rowCount = spark.read.format("csv").option("header", "false").load(fileCount).count
  rowCount
}

val execUdf = udf(executeRowCount _)

df.withColumn("row_count", execUdf (col("path"))).show()

这会导致以下错误

org.apache.spark.SparkException: Failed to execute user defined fu
nction($anonfun$1: (string) => bigint)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
        at $line39.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:28)
        at $line39.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:25)
        ... 19 more

我试图在收集时遍历该列

val te = df.select("path").as[String].collect()
te.foreach(executeRowCount)

在这里它工作得很好,但我想将结果存储在 df...

我尝试了几种解决方案,但我在这里面临死胡同。

【问题讨论】:

    标签: scala apache-spark user-defined-functions


    【解决方案1】:

    这不起作用,因为数据帧只能在驱动程序 JVM 中创建,但 UDF 代码在执行程序 JVM 中运行。您可以做的是将 CSV 加载到单独的数据框中,并使用文件名列丰富数据:

    val csvs = spark
     .read
     .format("csv")
     .load("/file/dir/")
     .withColumn("filename", input_file_name())
    

    然后在filename列上加入原来的df

    【讨论】:

    • 嘿 ollik1,/file/dir 不同。并非所有文件都存储在同一目录中。事实上,各种目录中有数百个文件。我无法一步从 hdfs 加载所有内容。文件的大小从数百 mbs 到一些 gbs 不等..
    • @datanin 可以定义多个位置 stackoverflow.com/questions/24029873/… 。文件加载是懒惰的,所以如果它运行良好,至少值得一试
    • 一个数据框是一个分布式的数据集合,所以我不明白为什么我不能在df上使用UDF。尽管如此,使用文件名列的解决方法是一个好主意,我会尝试那个。如果可行,这将是我的解决方案。谢谢
    • 嗨 ollik1,现在我了解司机/工人的问题了。它不是数据帧,而是不可序列化的 UDF。
    【解决方案2】:

    我通过以下方式解决了这个问题:

    val queue = df.select("path").as[String].collect()
    val countResult = for (item <- queue) yield {
        val rowCount = (item, spark.read.format("csv").option("header", "false").load(item).count)
        rowCount
    }
    
    val df2 = spark.createDataFrame(countResult)
    

    后来我用 df2 加入了 df...

    这里的问题是 @ollik1 在 udfs 的驱动程序/工作程序架构中提到的。 UDF 不可序列化,我需要 spark.read 函数。

    【讨论】:

      【解决方案3】:

      怎么样? :

      def executeRowCount = udf((fileCount: String) => {
        spark.read.format("csv").option("header", "false").load(fileCount).count
      })
      
      df.withColumn("row_count", executeRowCount(col("path"))).show()
      

      【讨论】:

        【解决方案4】:

        可能是这样的吗?

          sqlContext
            .read
            .format("csv")
            .load("/tmp/input/")
            .withColumn("filename", input_file_name())
            .groupBy("filename")
            .agg(count("filename").as("record_count"))
            .show()
        

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 2021-12-18
          • 1970-01-01
          • 1970-01-01
          • 2017-08-11
          • 2023-03-25
          • 2019-08-27
          • 1970-01-01
          • 2020-10-15
          相关资源
          最近更新 更多