【问题标题】:Batching within an Apache Spark RDD map在 Apache Spark RDD 映射中进行批处理
【发布时间】:2014-09-13 22:36:18
【问题描述】:

我有一种情况,当给定批处理时,底层函数的运行效率显着提高。我有这样的现有代码:

// subjects: RDD[Subject]
val subjects = Subject.load(job, sparkContext, config)
val classifications = subjects.flatMap(subject => classify(subject)).reduceByKey(_ + _)
classifications.saveAsTextFile(config.output)

classify 方法适用于单个元素,但对元素组进行操作会更有效。我考虑使用coalesce 将RDD 拆分为多个块并将每个块作为一个组进行操作,但是这样做有两个问题:

  1. 我不确定如何返回映射的 RDD。
  2. classify 事先不知道组应该有多大,它会根据输入的内容而有所不同。

关于如何在理想情况下调用 classify 的示例代码(输出很混乱,因为它不会溢出非常大的输入):

def classifyRdd (subjects: RDD[Subject]): RDD[(String, Long)] = {
  val classifier = new Classifier
  subjects.foreach(subject => classifier.classifyInBatches(subject))
  classifier.classifyRemaining
  classifier.results
}

这样classifyInBatches 内部可以有这样的代码:

def classifyInBatches(subject: Subject) {
  if (!internals.canAdd(subject)) {
    partialResults.add(internals.processExisting)
  }
  internals.add(subject) // Assumption: at least one will fit.
}

我可以在 Apache Spark 中做什么来允许类似这样的行为?

【问题讨论】:

    标签: classification apache-spark rdd


    【解决方案1】:

    尝试使用mapPartitions 方法,该方法允许您的映射函数将分区用作迭代器并生成输出迭代器。

    你应该可以这样写:

    subjectsRDD.mapPartitions { subjects =>
      val classifier = new Classifier
      subjects.foreach(subject => classifier.classifyInBatches(subject))
      classifier.classifyRemaining
      classifier.results
    }
    

    【讨论】:

    • 我假设subjectsRDD 可以使用合并分割成可管理的块?还是有更好的选择?
    • 您想要更大还是更小的分区? coalesce 减少 RDD 中的分区数。如果您想对较小的批次进行分类,您可以在某个上游步骤中创建更多分区,repartitionsubjectRDD(我不推荐这样做,因为这需要随机播放),或者在您的 mapPartitions 中添加第二层批处理调用(例如,通过在 subjects 迭代器上调用 .grouped())。
    猜你喜欢
    • 2014-12-18
    • 1970-01-01
    • 2016-01-06
    • 1970-01-01
    • 1970-01-01
    • 2016-03-16
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多