【问题标题】:Use a method inside a UDF function Spark Scala在 UDF 函数 Spark Scala 中使用方法
【发布时间】:2017-09-22 19:21:03
【问题描述】:

我想在用户设计的函数中使用位于另一个类中的方法,但它不起作用。

我有一个方法:

 def traitementDataFrameEleve(sc:SparkSession, dfRedis:DataFrame, domainMail:String, dir:String):Boolean ={
     def loginUDF = udf((sn: String, givenName:String) => {
            LoginClass.GenerateloginPersone(sn,givenName,dfr)
          })

    dfEleve.withColumn("ENTPersonLogin",loginUDF(dfEleve("sn"),dfEleve("givenName")))
}

LoginClass 是一个包含 GenerateloginPerson 方法的类。

输出错误:

org.apache.spark.SparkException: Failed to execute user defined function(anonfun$loginUDF$1$1: (string, string) => string)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.NullPointerException
    at org.apache.spark.sql.Dataset.schema(Dataset.scala:410)
    at org.apache.spark.sql.Dataset.printSchema(Dataset.scala:419)
    at IntegrationDonneesENTLea_V1_AcBordeaux.LoginClass$.GenerateloginPersone(LoginClass.scala:16)
    at IntegrationDonneesENTLea_V1_AcBordeaux.Eleve$$anonfun$loginUDF$1$1.apply(Eleve.scala:25)
    at IntegrationDonneesENTLea_V1_AcBordeaux.Eleve$$anonfun$loginUDF$1$1.apply(Eleve.scala:23)
    ... 16 more

谢谢。

【问题讨论】:

    标签: scala apache-spark apache-spark-sql user-defined-functions


    【解决方案1】:

    不允许访问:

    • 分布式数据结构(如DatasetRDD)。
    • SparkConext / SparkSession

    来自 Spark 任务(转换,udf 应用程序)。这就是你获得 NPE 的原因。

    【讨论】:

    • @MounirHamdane 要做什么?您可以从 UDF 中调用一个方法(根据您的帖子标题),这里的问题是您的方法的 contents - GenerateloginPersone 似乎调用了 Dataset.printSchema -这是你做不到的。由于我们看不到GenerateloginPersone 的实现,我们无法提供更多关于它可以或应该如何实现的见解——我们不知道它做了什么。因此,总结一下:是的,您可以从 UDF 中调用方法;但是这些方法不能尝试使用SparkContext / SparkSession 或任何使用它们的东西(如Dataset / RDD)。
    猜你喜欢
    • 2016-12-02
    • 2021-02-23
    • 1970-01-01
    • 1970-01-01
    • 2017-09-16
    • 2018-02-17
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多