【问题标题】:spark map partitions to fill nan values火花映射分区以填充 nan 值
【发布时间】:2016-12-18 23:26:28
【问题描述】:

我想使用最后一个众所周知的观察来填充 spark 中的 nan 值 - 请参阅:Spark / Scala: fill nan with last good observation

我当前的解决方案使用窗口函数来完成任务。但这不是很好,因为所有值都映射到一个分区中。 val imputed: RDD[FooBar] = recordsDF.rdd.mapPartitionsWithIndex { case (i, iter) => fill(i, iter) } 应该工作得更好。但奇怪的是我的fill 函数没有执行。我的代码有什么问题?

+----------+--------------------+
|       foo|                 bar|
+----------+--------------------+
|2016-01-01|               first|
|2016-01-02|              second|
|      null|       noValidFormat|
|2016-01-04|lastAssumingSameDate|
+----------+--------------------+

这是完整的示例代码:

import java.sql.Date

import org.apache.log4j.{ Level, Logger }
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

case class FooBar(foo: Date, bar: String)

object WindowFunctionExample extends App {

  Logger.getLogger("org").setLevel(Level.WARN)
val conf: SparkConf = new SparkConf()
    .setAppName("foo")
    .setMaster("local[*]")

  val spark: SparkSession = SparkSession
    .builder()
    .config(conf)
    .enableHiveSupport()
    .getOrCreate()

  import spark.implicits._

  val myDff = Seq(("2016-01-01", "first"), ("2016-01-02", "second"),
    ("2016-wrongFormat", "noValidFormat"),
    ("2016-01-04", "lastAssumingSameDate"))
  val recordsDF = myDff
    .toDF("foo", "bar")
    .withColumn("foo", 'foo.cast("Date"))
    .as[FooBar]
  recordsDF.show

  def notMissing(row: FooBar): Boolean = {
    row.foo != null
  }

  val toCarry = recordsDF.rdd.mapPartitionsWithIndex { case (i, iter) => Iterator((i, iter.filter(notMissing(_)).toSeq.lastOption)) }.collectAsMap
  println("###################### carry ")
  println(toCarry)
  println(toCarry.foreach(println))
  println("###################### carry ")
  val toCarryBd = spark.sparkContext.broadcast(toCarry)

  def fill(i: Int, iter: Iterator[FooBar]): Iterator[FooBar] = {
    var lastNotNullRow: FooBar = toCarryBd.value(i).get
    iter.map(row => {
      if (!notMissing(row))1
        FooBar(lastNotNullRow.foo, row.bar)
      else {
        lastNotNullRow = row
        row
      }
    })
  }

  // The algorithm does not step into the for loop for filling the null values. Strange
  val imputed: RDD[FooBar] = recordsDF.rdd.mapPartitionsWithIndex { case (i, iter) => fill(i, iter) }
  val imputedDF = imputed.toDS()

  println(imputedDF.orderBy($"foo").collect.toList)
  imputedDF.show
  spark.stop
}

编辑

我修复了评论中概述的代码。但是 toCarryBd 包含 None 值。当我为

明确过滤时,怎么会发生这种情况
def notMissing(row: FooBar): Boolean = {row.foo != null}
iter.filter(notMissing(_)).toSeq.lastOption

None 值。

(2,None)
(5,None)
(4,None)
(7,Some(FooBar(2016-01-04,lastAssumingSameDate)))
(1,Some(FooBar(2016-01-01,first)))
(3,Some(FooBar(2016-01-02,second)))
(6,None)
(0,None)

当尝试访问toCarryBd 时,这会导致NoSuchElementException: None.get

【问题讨论】:

    标签: scala apache-spark apache-spark-sql rdd


    【解决方案1】:

    首先,如果您的foo 字段可以为空,我建议将案例类创建为:

    case class FooBar(foo: Option[Date], bar: String)
    

    然后,您可以将 notMissing 函数重写为:

    def notMissing(row: Option[FooBar]): Boolean = row.isDefined && row.get.foo.isDefined
    

    【讨论】:

    • 你能解释一下为什么地图会产生很多None 条目吗?
    • 如果 Seq 为空,@GeorgHeiler iter.filter(notMissing(_)).toSeq.lastOption 将返回 none。
    • 谢谢。而为什么原来的df只包含4行却执行了8次?
    • @GeorgHeiler 每个分区执行一次。您可能正在运行您的代码,总共有 8 个执行器核心。
    • 我在这里建立了一个例子gist.github.com/geoHeil/6a23d18ccec085d486165089f9f430f2 仍然没有值被放入“空”分区的映射中。你知道如何解决这个问题吗? FooBar(lastNotNullRow.getOrElse(FooBar(Option(Date.valueOf("‌​2016-01-01")), "DUMMY")).foo, foo.bar) 将只使用默认情况。 (编辑:更新的网址)
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2022-07-20
    • 2019-07-11
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多