【问题标题】:Spark RDD to CSV - Add empty columnsSpark RDD 到 CSV - 添加空列
【发布时间】:2015-06-30 12:06:06
【问题描述】:

我有一个 RDD[Map[String,Int]],其中地图的键是列名。每个地图都不完整,要知道我需要合并所有键的列名。有没有办法避免这种收集操作以了解所有密钥并只使用一次 rdd.saveAsTextFile(..) 来获取 csv?

例如,假设我有一个包含两个元素的 RDD(scala 表示法):

Map("a"->1, "b"->2)
Map("b"->1, "c"->3)

我想最终得到这个 csv:

a,b,c
1,2,0
0,1,3

Scala 解决方案更好,但任何其他与 Spark 兼容的语言都可以。

编辑:

我也可以尝试从另一个方向解决我的问题。假设我一开始就知道所有列,但我想摆脱所有地图中值为 0 的列。所以问题就变成了,我知道键是 ("a", "b", "c") 并且由此而来:

Map("a"->1, "b"->2, "c"->0)
Map("a"->3, "b"->1, "c"->0)

我需要写 csv:

a,b
1,2
3,1

是否可以只用一次收集来做到这一点?

【问题讨论】:

    标签: csv apache-spark


    【解决方案1】:

    如果你的说法是:“我的 RDD 中的每一个新元素都可能添加一个我迄今为止没有见过的新列名”,答案显然是无法避免全扫描。但是您不需要收集驱动程序上的所有元素。

    您可以使用aggregate 仅收集列名。该方法有两个功能,一个是将单个元素插入到结果集合中,另一个是将来自两个不同分区的结果合并。

    rdd.aggregate(Set.empty[String])( {(s, m) => s union m.keySet }, { (s1, s2) => s1 union s2 })
    

    您将获得 RDD 中所有列名的集合。在第二次扫描中,您可以打印 CSV 文件。

    【讨论】:

      【解决方案2】:

      Scala 和任何其他支持的语言

      您可以使用spark-csv

      首先让我们找到所有存在的列:

      val cols = sc.broadcast(rdd.flatMap(_.keys).distinct().collect())
      

      创建 RDD[Row]:

      val rows = rdd.map {
          row => { Row.fromSeq(cols.value.map { row.getOrElse(_, 0) })}
      }
      

      准备架构:

      import org.apache.spark.sql.types.{StructType, StructField, IntegerType}
      
      val schema = StructType(
          cols.value.map(field => StructField(field, IntegerType, true)))
      

      将 RDD[Row] 转换为数据框:

      val df = sqlContext.createDataFrame(rows, schema)
      

      写结果:

      // Spark 1.4+, for other versions see spark-csv docs
      df.write.format("com.databricks.spark.csv").save("mycsv.csv")
      

      您可以使用其他支持的语言做几乎相同的事情。

      Python

      如果您使用 Python 并且最终数据适合驱动程序内存,您可以通过 toPandas() 方法使用 Pandas:

      rdd = sc.parallelize([{'a': 1, 'b': 2}, {'b': 1, 'c': 3}])
      cols = sc.broadcast(rdd.flatMap(lambda row: row.keys()).distinct().collect())
      
      df = sqlContext.createDataFrame(
          rdd.map(lambda row: {k: row.get(k, 0) for k in cols.value}))
      
      df.toPandas().save('mycsv.csv')
      

      或直接:

      import pandas as pd 
      pd.DataFrame(rdd.collect()).fillna(0).save('mycsv.csv')
      

      编辑

      第二个collect 的一种可能方法是使用累加器来构建一组所有列名或在找到零的地方计算这些列名,并使用此信息映射行并删除不必要的列或添加零。

      这是可能的,但效率低下,感觉像是在作弊。唯一有意义的情况是零的数量非常少,但我想这里不是这种情况。

      object ColsSetParam extends AccumulatorParam[Set[String]] {
      
        def zero(initialValue: Set[String]): Set[String] = {
          Set.empty[String]
        }
      
        def addInPlace(s1: Set[String], s2: Set[String]): Set[String] = {
          s1 ++ s2
        }
      }
      
      val colSetAccum = sc.accumulator(Set.empty[String])(ColsSetParam)
      rdd.foreach { colSetAccum += _.keys.toSet } 
      

      // We assume you know this upfront
      val allColnames = sc.broadcast(Set("a", "b", "c"))
      
      object ZeroColsParam extends AccumulatorParam[Map[String, Int]] {
      
        def zero(initialValue: Map[String, Int]): Map[String, Int] = {
          Map.empty[String, Int]
        }
      
        def addInPlace(m1: Map[String, Int], m2: Map[String, Int]): Map[String, Int] = {
          val keys = m1.keys ++ m2.keys
          keys.map(
            (k: String) => (k -> (m1.getOrElse(k, 0) + m2.getOrElse(k, 0)))).toMap
        }
      }
      
      val accum = sc.accumulator(Map.empty[String, Int])(ZeroColsParam)
      
      rdd.foreach { row =>
        // If allColnames.value -- row.keys.toSet is empty we can avoid this part
        accum += (allColnames.value -- row.keys.toSet).map(x => (x -> 1)).toMap
      }
      

      【讨论】:

      • 谢谢,但这会收集两个,对吧?我的问题不是关于如何编写 csv,而是是否可以避免第一次收集 :)
      • 我的错。理论上可以使用累加器,但看起来根本不是一个好主意。
      猜你喜欢
      • 1970-01-01
      • 2015-07-09
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-09-13
      • 2015-07-06
      • 1970-01-01
      • 2018-12-05
      相关资源
      最近更新 更多