【问题标题】:Pass a function with any case class return type as parameter将具有任何案例类返回类型的函数作为参数传递
【发布时间】:2019-02-24 12:45:13
【问题描述】:

这可能是一个愚蠢的问题,但我已经苦苦挣扎了一段时间。它确实类似于this question,但我无法在我的代码中应用它(与模式或作为一个函数)。

我想将 flatMap(或 map)转换函数传递给函数参数,然后将其代理到实际调用 df.rdd.flatMap 方法的策略函数。我会尽力解释的!

case class Order(id: String, totalValue: Double, freight: Double) 
case class Product(id: String, price: Double) 

... or any other case class, whatever one needs to transform a row into ...

实体类:

class Entity(path: String) = {
  ...
  def flatMap[T](mapFunction: (Row) => ArrayBuffer[T]): Entity = {
      this.getStrategy.flatMap[T](mapFunction)
      return this
  }
  def save(path: String): Unit = {
      ... write logic ...
  } 
}

一个实体的方法可能有不同的策略。 EntityStrategy如下:

abstract class EntityStrategy(private val entity: Entity,
                              private val spark: SparkSession) {
  ...
  def flatMap[T](mapFunction: (Row) => ArrayBuffer[T])
  def map[T](mapFunction: (Row) => T)
}

还有一个示例 EntityStrategy 实现:

class SparkEntityStrategy(private val entity: Entity, private val spark: SparkSession)
  extends EntityStrategy(entity, spark) {
  ...
  override def map[T](mapFunction: Row => T): Unit = {
    val rdd = this.getData.rdd.map(f = mapFunction)
    this.dataFrame = this.spark.createDataFrame(rdd)
  }

  override def flatMap[T](mapFunction: (Row) => ArrayBuffer[T]): Unit = {
    var rdd = this.getData.rdd.flatMap(f = mapFunction)
    this.dataFrame = this.spark.createDataFrame(rdd)
  }
}

最后,我想创建一个 flatMap/map 函数并像这样调用它:

def transformFlatMap(row: Row): ArrayBuffer[Order] = {
    var orders = new ArrayBuffer[Order]
    var _deliveries = row.getAs[Seq[Row]]("deliveries")
    _deliveries.foreach(_delivery => {
       var order = Order(
           id = row.getAs[String]("id"),
           totalValue = _delivery.getAs("totalAmount").asInstanceOf[Double])
      orders += order
    })
   return orders
}

val entity = new Entity("path")
entity.flatMap[Order](transformFlatMap).save("path")

当然,这是行不通的。我在 SparkEntityStrategy 上收到一个错误:

错误:(95, 35) 没有可用于 T 的 ClassTag val rdd = this.getData.rdd.map(f = mapFunction)

我尝试在实体方法和策略方法中添加(implicit encoder: Encoder: T),但没有成功。由于我是 Scala 新手,因此可能做错了什么。

如果我删除“T”并传递一个实际的案例类,一切正常。

【问题讨论】:

    标签: scala apache-spark dataframe case-class classtag


    【解决方案1】:

    为了满足编译器和 Spark 的方法,我需要添加以下类型标签:

    [T <: scala.Product : ClassTag : TypeTag]

    所以这两种方法都变成了:

    def map[T <: Product : ClassTag : TypeTag](mapFunction: (Row) => T): Entity
    def flatMap[T <: scala.Product : ClassTag : TypeTag](mapFunction: (Row) => TraversableOnce[T]): Entity
    

    关于scala.Product

    所有产品的基本特征,在标准库中包括 至少 scala.Product1 到 scala.Product22 ,因此也是他们的 从 scala.Tuple1 到 scala.Tuple22 的子类。此外,所有情况下 类使用综合生成的方法实现 Product。

    由于我使用案例类对象作为函数的返回类型,我需要 scala.Product 以便 Spark 的 createDataFrame 可以匹配正确的重载。

    为什么同时使用 ClassTagTypeTag

    通过删除 TypeTag,编译器会抛出以下错误:

    错误:(96, 48) T 没有可用的 TypeTag this.dataFrame = this.spark.createDataFrame(rdd)

    并删除 ClassTag

    错误:(95, 35) 没有可用于 T 的 ClassTag val rdd = this.getData.rdd.map(f = mapFunction)

    添加它们使两种方法都满意,并且一切都按预期工作。

    找到一个 good article 解释 Scala 中的类型擦除。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-09-29
      • 1970-01-01
      • 2021-04-02
      • 1970-01-01
      • 1970-01-01
      • 2018-02-17
      相关资源
      最近更新 更多