【发布时间】:2019-02-24 12:45:13
【问题描述】:
这可能是一个愚蠢的问题,但我已经苦苦挣扎了一段时间。它确实类似于this question,但我无法在我的代码中应用它(与模式或作为一个函数)。
我想将 flatMap(或 map)转换函数传递给函数参数,然后将其代理到实际调用 df.rdd.flatMap 方法的策略函数。我会尽力解释的!
case class Order(id: String, totalValue: Double, freight: Double)
case class Product(id: String, price: Double)
... or any other case class, whatever one needs to transform a row into ...
实体类:
class Entity(path: String) = {
...
def flatMap[T](mapFunction: (Row) => ArrayBuffer[T]): Entity = {
this.getStrategy.flatMap[T](mapFunction)
return this
}
def save(path: String): Unit = {
... write logic ...
}
}
一个实体的方法可能有不同的策略。 EntityStrategy如下:
abstract class EntityStrategy(private val entity: Entity,
private val spark: SparkSession) {
...
def flatMap[T](mapFunction: (Row) => ArrayBuffer[T])
def map[T](mapFunction: (Row) => T)
}
还有一个示例 EntityStrategy 实现:
class SparkEntityStrategy(private val entity: Entity, private val spark: SparkSession)
extends EntityStrategy(entity, spark) {
...
override def map[T](mapFunction: Row => T): Unit = {
val rdd = this.getData.rdd.map(f = mapFunction)
this.dataFrame = this.spark.createDataFrame(rdd)
}
override def flatMap[T](mapFunction: (Row) => ArrayBuffer[T]): Unit = {
var rdd = this.getData.rdd.flatMap(f = mapFunction)
this.dataFrame = this.spark.createDataFrame(rdd)
}
}
最后,我想创建一个 flatMap/map 函数并像这样调用它:
def transformFlatMap(row: Row): ArrayBuffer[Order] = {
var orders = new ArrayBuffer[Order]
var _deliveries = row.getAs[Seq[Row]]("deliveries")
_deliveries.foreach(_delivery => {
var order = Order(
id = row.getAs[String]("id"),
totalValue = _delivery.getAs("totalAmount").asInstanceOf[Double])
orders += order
})
return orders
}
val entity = new Entity("path")
entity.flatMap[Order](transformFlatMap).save("path")
当然,这是行不通的。我在 SparkEntityStrategy 上收到一个错误:
错误:(95, 35) 没有可用于 T 的 ClassTag val rdd = this.getData.rdd.map(f = mapFunction)
我尝试在实体方法和策略方法中添加(implicit encoder: Encoder: T),但没有成功。由于我是 Scala 新手,因此可能做错了什么。
如果我删除“T”并传递一个实际的案例类,一切正常。
【问题讨论】:
标签: scala apache-spark dataframe case-class classtag