【问题标题】:Scala spark: Create List of Dataset from a Dataset map operationScala spark:从数据集映射操作创建数据集列表
【发布时间】:2020-04-04 20:40:47
【问题描述】:

假设我想在转换另一个数据集后创建 2 种类型的指标:metricA 或 metricB。如果满足某个条件,它将同时生成 metricA 和 B,如果不满足条件,则仅生成 metric A。想法是将 2 个 metric 写入 2 个不同的路径(pathA,pathB)。

我采取的方法是创建一个 GeneralMetric 的 Dataset,然后根据里面的内容,写入不同的路径,但显然它不起作用,因为 Dataset 内部的模式匹配不起作用

val s: SparkSession = SparkSession
    .builder()
    .appName("Metric")
    .getOrCreate()
import s.implicits._

case class original (id : Int, units: List[Double])

case class MetricA (a: Int, b: Int, filtered_unit: List[Double])
case class MetricB (a: Int, filtered_unit: List[Double])
case class GeneralMetric(metricA: MetricA, metricB: Option[MetricB])

def createA: MetricA = {
    MetricA(1, 1, List(1.0, 2.0)
}

def createB: MetricB = {
    MetricB(1, List(10.0, 20.0)
}
def create (isBoth: Boolean): GeneralMetric = {
    if(isBoth) {
       val a: MetricA = createA()
       val b: MetricB = createB()
       GeneralMetric(a, Some(b))
    }
    else {
       val a: MetricA = createA()
       GeneralMetric(a, None)
    }
}

val originalDF: DataFrame

val result : Dataset[GeneralMetric] =
                 originalDF.as[original]
                 .map { r =>
                      if(r.id == 21) create(true)
                      else create(false)
                 }

val pathA: String = "s3://pathA"
val pathB: String = "s3://pathB"

//below code obviously wouldn't work
result.map(x => {
    case (metricA, Some(metricB)) => {
      metricA.write.parquet(pathA)
      metricB.write.parquet(pathB)
    }
    case (metricA, None) => metricA.write.parquet(pathA)

  })

我想到的下一个方法是将结果放入 List[GeneralMetric] 中,其中 GeneralMetric 是 密封轨迹,由 MetricA 和 MetricB 扩展,但我如何制作数据集转换返回一个 GeneralMetric 列表。

任何想法都会有所帮助

【问题讨论】:

    标签: scala apache-spark generics


    【解决方案1】:

    为什么不

    result.map({
        case (metricA, Some(metricB)) =>
          metricA.write.parquet(pathA)
          metricB.write.parquet(pathB)
        case (metricA, None) => metricA.write.parquet(pathA)
    
      })
    

    在你的情况下工作?这只是语法问题吗?


    另外:您似乎是独立发送指标(或至少在此示例中)。您可以将其建模为:

    sealed trait Metric {
      def write
    }
    case class MetricA (a: Int, b: Int, filtered_unit: List[Double]) extends Metric {
      override def write: Unit = ???
    }
    case class MetricB (a: Int, filtered_unit: List[Double]) extends Metric {
      override def write: Unit = ???
    }
    

    然后打电话

    implicit val enc: Encoder[Metric] = Encoders.kryo[Metric]
    val result: Dataset[Metric] =
        originalDF.as[original]
          .flatMap { r =>
            if (r.id == 21) createA :: createB :: Nil
            else createA :: Nil
          }
    result.foreach(metric.write.parquet())
    

    【讨论】:

    • 感谢您分享替代方案,我更喜欢您的方式,因为添加更多指标比每次更新一般指标要容易得多。但是,我仍然面临着写出 diff 的老敌人。不同路径的指标。我的意思是我留下了 Dataset[Metric] 并且上面的 foreach 不起作用。 write 函数没有任何 parquet DataFrameWriter 实用程序。知道如何写入差异路径:pathA,pathB 另外我认为数据集上的 foreach 将尝试访问数据集的每一行
    • 我已经发布了一个单独的问题 reg 写入不同的路径。我接受了你的回答,因为它解决了我原来的问题。你能看看吗:stackoverflow.com/questions/61054782/…
    猜你喜欢
    • 2021-09-01
    • 2018-10-07
    • 2018-10-01
    • 2020-08-05
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-09-01
    • 1970-01-01
    相关资源
    最近更新 更多