【问题标题】:spark apply function to columns in parallel火花将功能并行应用于列
【发布时间】:2017-01-02 20:57:32
【问题描述】:

Spark 将并行处理数据,但不处理操作。在我的 DAG 中,我想为每列调用一个函数,例如 Spark processing columns in parallel 每列的值可以独立于其他列计算。有没有办法通过 spark-SQL API 实现这种并行性?利用窗口函数Spark dynamic DAG is a lot slower and different from hard coded DAG 有助于优化 DAG,但只能以串行方式执行。

可以找到包含更多信息的示例https://github.com/geoHeil/sparkContrastCoding

下面的最小示例:

val df = Seq(
    (0, "A", "B", "C", "D"),
    (1, "A", "B", "C", "D"),
    (0, "d", "a", "jkl", "d"),
    (0, "d", "g", "C", "D"),
    (1, "A", "d", "t", "k"),
    (1, "d", "c", "C", "D"),
    (1, "c", "B", "C", "D")
  ).toDF("TARGET", "col1", "col2", "col3TooMany", "col4")

val inputToDrop = Seq("col3TooMany")
val inputToBias = Seq("col1", "col2")

val targetCounts = df.filter(df("TARGET") === 1).groupBy("TARGET").agg(count("TARGET").as("cnt_foo_eq_1"))
val newDF = df.toDF.join(broadcast(targetCounts), Seq("TARGET"), "left")
  newDF.cache
def handleBias(df: DataFrame, colName: String, target: String = target) = {
    val w1 = Window.partitionBy(colName)
    val w2 = Window.partitionBy(colName, target)

    df.withColumn("cnt_group", count("*").over(w2))
      .withColumn("pre2_" + colName, mean(target).over(w1))
      .withColumn("pre_" + colName, coalesce(min(col("cnt_group") / col("cnt_foo_eq_1")).over(w1), lit(0D)))
      .drop("cnt_group")
  }

val joinUDF = udf((newColumn: String, newValue: String, codingVariant: Int, results: Map[String, Map[String, Seq[Double]]]) => {
    results.get(newColumn) match {
      case Some(tt) => {
        val nestedArray = tt.getOrElse(newValue, Seq(0.0))
        if (codingVariant == 0) {
          nestedArray.head
        } else {
          nestedArray.last
        }
      }
      case None => throw new Exception("Column not contained in initial data frame")
    }
  })

现在我想将我的handleBias 函数应用于所有列,不幸的是,这不是并行执行的。

val res = (inputToDrop ++ inputToBias).toSet.foldLeft(newDF) {
    (currentDF, colName) =>
      {
        logger.info("using col " + colName)
        handleBias(currentDF, colName)
      }
  }
    .drop("cnt_foo_eq_1")

val combined = ((inputToDrop ++ inputToBias).toSet).foldLeft(res) {
    (currentDF, colName) =>
      {
        currentDF
          .withColumn("combined_" + colName, map(col(colName), array(col("pre_" + colName), col("pre2_" + colName))))
      }
  }

val columnsToUse = combined
    .select(combined.columns
      .filter(_.startsWith("combined_"))
      map (combined(_)): _*)

val newNames = columnsToUse.columns.map(_.split("combined_").last)
val renamed = columnsToUse.toDF(newNames: _*)

val cols = renamed.columns
val localData = renamed.collect

val columnsMap = cols.map { colName =>
    colName -> localData.flatMap(_.getAs[Map[String, Seq[Double]]](colName)).toMap
}.toMap

【问题讨论】:

    标签: scala apache-spark parallel-processing apache-spark-sql


    【解决方案1】:

    每列的值可以独立于其他列计算

    虽然这是真的,但它并不能真正帮助您解决问题。您可以生成多个独立的DataFrames,每个都有自己的添加,但这并不意味着您可以自动将其组合成一个执行计划。

    handleBias 的每个应用程序都会对您的数据进行两次洗牌,并且输出 DataFrames 的数据分布与父 DataFrame 的数据分布不同。这就是为什么当您fold 在列列表上时,每个添加都必须单独执行。

    理论上你可以设计一个可以表达的管道(使用伪代码):

    • 添加唯一标识:

      df_with_id = df.withColumn("id", unique_id())
      
    • 独立计算每个df并转换为格式:

      dfs = for (c in columns) 
        yield handle_bias(df, c).withColumn(
          "pres", explode([(pre_name, pre_value), (pre2_name, pre2_value)])
        )
      
    • 联合所有部分结果:

      combined = dfs.reduce(union)
      
    • pivot 将长格式转换为宽格式:

      combined.groupBy("id").pivot("pres._1").agg(first("pres._2"))
      

    但我怀疑这是否值得大惊小怪。您使用的进程非常繁重,需要大量的网络和磁盘 IO。

    如果总级别的数量 (sum count(distinct x)) for x in columns)) 相对较少,您可以尝试使用例如 aggregateByKeyMap[Tuple2[_, _], StatCounter] 一次通过计算所有统计信息,否则考虑下采样到可以在本地计算统计信息的级别。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-11-03
      • 1970-01-01
      • 1970-01-01
      • 2015-11-03
      相关资源
      最近更新 更多