【问题标题】:Apache Spark: Splitting Pair RDD into multiple RDDs by key to save valuesApache Spark:通过键将Pair RDD拆分为多个RDD以保存值
【发布时间】:2014-09-22 15:33:43
【问题描述】:

我正在使用 Spark 1.0.1 处理大量数据。每行包含一个 ID 号,其中一些具有重复的 ID。我想将具有相同 ID 号的所有行保存在同一位置,但我无法有效地执行此操作。我创建了一个(ID 号,数据行)对的 RDD[(String, String)]:

val mapRdd = rdd.map{ x=> (x.split("\\t+")(1), x)} 

一种可行但不高效的方法是收集 ID 号,过滤每个 ID 的 RDD,并将具有相同 ID 的值的 RDD 保存为文本文件。

val ids = rdd.keys.distinct.collect
ids.foreach({ id =>
    val dataRows = mapRdd.filter(_._1 == id).values
    dataRows.saveAsTextFile(id)
})

我还尝试了 groupByKey 或 reduceByKey,这样 RDD 中的每个元组都包含一个唯一的 ID 号作为键,以及由该 ID 号的新行分隔的一串组合数据行。我只想使用 foreach 遍历 RDD 一次来保存数据,但它不能将值作为 RDD 给出

groupedRdd.foreach({ tup =>
  val data = sc.parallelize(List(tup._2)) //nested RDD does not work
  data.saveAsTextFile(tup._1)
})

本质上,我想通过 ID 号将一个 RDD 拆分为多个 RDD,并将该 ID 号的值保存到它们自己的位置。

【问题讨论】:

  • 按ID分组后保存文件有什么问题,它们不一定在单独的文件中,但它们不会在文件之间拆分,您可以控制您的分区数create 应该对应于创建的文件数
  • @aaronman 这不起作用,因为我需要拆分原始数据源并根据 ID 号将数据存储在不同的位置。最终会根据id号按需请求数据,是一个非常大的数据集。
  • 如果你按照我建议的方式保存它,RDD肯定可以重新读取数据并通过用户ID获取数据,这是一个可以接受的解决方案
  • 几天前我不得不执行同样的操作并遇到了和你一样的问题。据我所知,没有办法对 RDD 进行分组,然后保留该分组的值而不将它们放入内存中给驱动程序。你考虑过邮件列表吗?如果您发现了什么,请更新此问题,以便我们获取详细信息。
  • @jhappoldt 这绝对不是我想我会回答这个问题的情况

标签: apache-spark filter rdd


【解决方案1】:

我认为这个问题类似于 Write to multiple outputs by key Spark - one Spark job

请参考那里的答案。

import org.apache.hadoop.io.NullWritable

import org.apache.spark._
import org.apache.spark.SparkContext._

import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat

class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
  override def generateActualKey(key: Any, value: Any): Any = 
    NullWritable.get()

  override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = 
    key.asInstanceOf[String]
}

object Split {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Split" + args(1))
    val sc = new SparkContext(conf)
    sc.textFile("input/path")
    .map(a => (k, v)) // Your own implementation
    .partitionBy(new HashPartitioner(num))
    .saveAsHadoopFile("output/path", classOf[String], classOf[String],
      classOf[RDDMultipleTextOutputFormat])
    spark.stop()
  }
}

刚刚看到上面类似的答案,但实际上我们不需要自定义分区。 MultipleTextOutputFormat 将为每个键创建文件。具有相同键的多条记录落入同一个分区是可以的。

new HashPartitioner(num),这里的num是你想要的分区号。如果您有大量不同的键,您可以将 number 设置为 big。在这种情况下,每个分区不会打开太多的 hdfs 文件处理程序。

【讨论】:

  • 是否有 Python 等价物?我不确定在 PySpark (spark.apache.org/docs/latest/api/python/…) 中如何处理 saveAsHadoopFile()
  • 这实际上是否适用于没有 HDFS 的 S3 Native FS?我想知道文件何时会真正上传到 s3,可能是在工作完成时?因为最后 X 条记录可能属于所有 X 文件……所以在处理最后一条记录之前,什么都不能上传到 s3,对吧?
【解决方案2】:

你可以在分组的RDD上直接调用saveAsTextFile,这里它会根据分区保存数据,我的意思是,如果你有4个distinctID,并且你指定groupedRDD的分区数为4,那么spark将每个分区数据存储到一个文件(因此您只能拥有一个文件管理器 ID)您甚至可以将数据视为文件系统中每个 ID 的可迭代对象。

【讨论】:

    【解决方案3】:

    这将保存每个用户 ID 的数据

    val mapRdd = rdd.map{ x=> (x.split("\\t+")(1),
    x)}.groupByKey(numPartitions).saveAsObjectFile("file")
    

    如果您需要根据用户 ID 再次检索数据,您可以执行类似的操作

    val userIdLookupTable = sc.objectFile("file").cache() //could use persist() if data is to big for memory  
    val data = userIdLookupTable.lookup(id) //note this returns a sequence, in this case you can just get the first one  
    

    请注意,在这种情况下没有特别的理由保存到文件中,因为 OP 要求它,所以我只是这样做了,据说保存到文件确实允许您在初始分组后的任何时间加载 RDD已经完成了。

    最后一件事,lookup 比访问 id 的过滤方法更快,但如果您愿意从 spark 发出拉取请求,您可以查看this answer 以获得更快的方法

    【讨论】:

    • 这将以 (Key, Seq(Values)) 的形式序列化分组,并且询问如何分组然后仅保留 Seq(Values) 的问题。您是否知道一种在不将值收集到驱动程序的情况下并行保存值的方法?谢谢!
    • @jhappoldt 这是并行的,为什么你只想要Seq(Values) 然后你会丢失用户信息?
    猜你喜欢
    • 1970-01-01
    • 2019-06-16
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-06-15
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多