Scala 和任何其他支持的语言
您可以使用spark-csv
首先让我们找到所有存在的列:
val cols = sc.broadcast(rdd.flatMap(_.keys).distinct().collect())
创建 RDD[Row]:
val rows = rdd.map {
row => { Row.fromSeq(cols.value.map { row.getOrElse(_, 0) })}
}
准备架构:
import org.apache.spark.sql.types.{StructType, StructField, IntegerType}
val schema = StructType(
cols.value.map(field => StructField(field, IntegerType, true)))
将 RDD[Row] 转换为数据框:
val df = sqlContext.createDataFrame(rows, schema)
写结果:
// Spark 1.4+, for other versions see spark-csv docs
df.write.format("com.databricks.spark.csv").save("mycsv.csv")
您可以使用其他支持的语言做几乎相同的事情。
Python
如果您使用 Python 并且最终数据适合驱动程序内存,您可以通过 toPandas() 方法使用 Pandas:
rdd = sc.parallelize([{'a': 1, 'b': 2}, {'b': 1, 'c': 3}])
cols = sc.broadcast(rdd.flatMap(lambda row: row.keys()).distinct().collect())
df = sqlContext.createDataFrame(
rdd.map(lambda row: {k: row.get(k, 0) for k in cols.value}))
df.toPandas().save('mycsv.csv')
或直接:
import pandas as pd
pd.DataFrame(rdd.collect()).fillna(0).save('mycsv.csv')
编辑
第二个collect 的一种可能方法是使用累加器来构建一组所有列名或在找到零的地方计算这些列名,并使用此信息映射行并删除不必要的列或添加零。
这是可能的,但效率低下,感觉像是在作弊。唯一有意义的情况是零的数量非常少,但我想这里不是这种情况。
object ColsSetParam extends AccumulatorParam[Set[String]] {
def zero(initialValue: Set[String]): Set[String] = {
Set.empty[String]
}
def addInPlace(s1: Set[String], s2: Set[String]): Set[String] = {
s1 ++ s2
}
}
val colSetAccum = sc.accumulator(Set.empty[String])(ColsSetParam)
rdd.foreach { colSetAccum += _.keys.toSet }
或
// We assume you know this upfront
val allColnames = sc.broadcast(Set("a", "b", "c"))
object ZeroColsParam extends AccumulatorParam[Map[String, Int]] {
def zero(initialValue: Map[String, Int]): Map[String, Int] = {
Map.empty[String, Int]
}
def addInPlace(m1: Map[String, Int], m2: Map[String, Int]): Map[String, Int] = {
val keys = m1.keys ++ m2.keys
keys.map(
(k: String) => (k -> (m1.getOrElse(k, 0) + m2.getOrElse(k, 0)))).toMap
}
}
val accum = sc.accumulator(Map.empty[String, Int])(ZeroColsParam)
rdd.foreach { row =>
// If allColnames.value -- row.keys.toSet is empty we can avoid this part
accum += (allColnames.value -- row.keys.toSet).map(x => (x -> 1)).toMap
}