【问题标题】:Spark Scala Dataset map works in main but not in functionSpark Scala 数据集映射在 main 中有效,但在函数中无效
【发布时间】:2017-11-05 09:28:24
【问题描述】:

我有 2 个数据集:

implicit val spark: SparkSession = SparkSession
  .builder()
  .appName("app").master("local[1]")
  .config("spark.executor.memory", "1g")
  .getOrCreate()


import spark.implicits._
val ds1 = /*read csv file*/.as[caseClass1]   
val ds2 = /*read csv file*/.as[caseClass2]  

然后我加入并映射如下:

  val ds3 = ds1.
  joinWith(ds2, ds1("id") === ds2("id"))
  .map{case(left, right) => (left, Option(right))}

得到预期的结果。

问题是我正在尝试用它和其他一些函数来实现 RichDataset,如下所示:

object Extentions {

  implicit class RichDataset[T <: Product](leftDs: Dataset[T]) {

    def leftJoinWith[V <: Product](rightDs: Dataset[V], condition: 
Column)(implicit spark: SparkSession) : Dataset[(T, Option[V])] = {
      import spark.implicits._

      leftDs.joinWith(rightDs, condition, "left")
        .map{case(left, right) => (left, Option(right))}
    }
  }
 }

在 main 中,使用 import Extentions._ 调用 leftJoinWith 失败:

Error:(15, 13) Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases. .map{case(left, right) => (left, Option(right))}

Error:(15, 13) not enough arguments for method map: (implicit evidence$6: org.apache.spark.sql.Encoder[(T, Option[V])])org.apache.spark.sql.Dataset[(T, Option[V])]. Unspecified value parameter evidence$6. .map{case(left, right) => (left, Option(right))}

...但是 spark.implicits._ 是在函数内部导入的!

如果 return 只是 join,而不是 join + map,它将在 main 和 function 中都有效。

scalaVersion := "2.11.8", sparkVersion := "2.2.0"

提前致谢!

【问题讨论】:

    标签: scala apache-spark dataset implicit


    【解决方案1】:

    如果将TypeTag 添加到泛型类型参数中,它会起作用(在 Spark 的源代码中看到了这一点):

    import scala.reflect.runtime.universe.TypeTag
    import org.apache.spark.sql.{Column, Dataset, SparkSession}
    
    
    object Extentions {
    
      implicit class RichDataset[T <: Product : TypeTag](leftDs: Dataset[T]) {
    
        def leftJoinWith[V <: Product : TypeTag](rightDs: Dataset[V], condition:
        Column)(implicit spark: SparkSession) : Dataset[(T, Option[V])] = {
          import spark.implicits._
    
          leftDs.joinWith(rightDs, condition, "left")
            .map{case(left, right) => (left, Option(right))}
        }
      }
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-12-15
      • 2020-04-14
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多