【问题标题】:Best MapReduce Algorithm to calculate the number of every single overlapped intervals计算每个重叠区间数的最佳 MapReduce 算法
【发布时间】:2018-08-12 23:16:29
【问题描述】:

[a, b] 格式中有数十亿个区间,所有这些区间都会将数字空间切割成多个单块。我打算输出所有单件以及这件作品中重叠间隔的数量。

例如:有3个区间,分别是:[1,7], [2,3], [6, 8]。它应该输出如下结果:

[-∞, 1]: 0

[1, 2]: 1

[2, 3]: 2

[3, 6]: 1

[6, 7]: 2

[7, 8]: 1

[8, +∞]: 0

如果对于单台机器(不是 MapReduce 中的分布式解决方案),我知道解决方案可以将区间实例分解为 start_nend_n,按数字排序并从左到右迭代并使用一个计数器,用于计算当前件的数量并输出。但我不确定如何将这种算法拆分为分布式方式。

有什么建议吗?谢谢。

【问题讨论】:

  • 那么 [8,9] 呢,最后一个应该是 [8, infinity]?
  • 是的,你说得对,错字。已修改,谢谢! @gyan
  • 请检查更新的代码逻辑。如果有帮助,请不要忘记点赞并接受。

标签: java hadoop apache-spark mapreduce distributed-computing


【解决方案1】:

下面是一个有效的 Spark 代码(至少在你的例子中它给出了正确的结果:

由于 2 个笛卡尔积,代码效率不高。

区间比较的条件也可能需要注意:)

请随时改进代码并在此处发布改进后的答案。

import org.apache.spark.{SparkConf, SparkContext}

object Main {

  val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("")
  val sc = new SparkContext(conf)

  case class Interval(start : Double, end : Double)

  def main(args: Array[String]): Unit = {

sc.setLogLevel("ERROR")

val input = List(Interval(1, 7), Interval(2, 3), Interval(6, 8))
val infinities = List(Double.NegativeInfinity, Double.PositiveInfinity)
val inputRdd = sc.parallelize(input)
val infinitiesRdd = sc.parallelize(infinities)

// Get unique flat boundary values  e.g.: Interval(1, 7) will give 2 boundary values: [1, 7]
val boundaries = inputRdd.flatMap(v => List(v.start, v.end)).distinct()
// Additionally we will need negative and positive infinities
val all_boundaries = boundaries.union(infinitiesRdd)

// Calculate all intervals
val intervals = all_boundaries
  // For each interval start get all possible interval ends
  .cartesian(all_boundaries)    // [(1, 2), (1, 3), (1, 6), (2, 1), ...]
  // Filter out invalid intervals (where begin is either less or equal to the end)  e.g.: from previous comment (2, 1) is invalid interval
  .filter(v => v._1 < v._2)     // [(1, 2), (1, 3), (1, 6), (2, 3), ...]
  // Find lesser interval end e.g.: in previous comment (1, 2) -> 2 is smallest value for the same start (1)
  .reduceByKey((a, b) => Math.min(a, b))  // [(1, 2) (2, 3), ...]

// Uncommend this to print intermediate result
// intervals.sortBy(_._1).collect().foreach(println)

// Get counts of overlapping intervals
val countsPerInterval = intervals
  // for each small interval get all possible intput intervals e.g.:
  .cartesian(inputRdd)    // [((1, 2), Interval(1, 7)), ((1, 2), Interval(2, 3)), ...]
  // Filter out intervals that do not overlap
  .filter{ case (smallInterval, inputInterval) => inputInterval.start <= smallInterval._1 && inputInterval.end >= smallInterval._2}   // [((1, 2), Interval(1, 7)), ((1, 2), Interval(2, 3)), ...]
  // Since we're not interested in intervals, but only in count of intervals -> change interval to 1 for reduction
  .mapValues(_ => 1)      //[((1, 2), 1), ((1, 2), 1), ...]
  // Calculate a sum per interval
  .reduceByKey(_ + _)   // [((1, 2), 2), ...]

// print result
countsPerInterval.sortBy(_._1).collect().foreach(println)
  }

}

【讨论】:

    【解决方案2】:

    在 mapreduce 中,最简单的方法是将对中的每个数字写入 reducer。 sort shuffle 阶段负责对数字进行排序,reducer 负责修复。

    例如对于输入对[1,7],Mapper 输出将是:

    key: NullWritable  Value: 1
    key: NullWritable  Value: 7
    key: NullWritable  Value: 1_7
    

    使用相同的模式,所有映射器的输出形式将是:

    key: NullWritable  Value: 1
    key: NullWritable  Value: 7
    key: NullWritable  Value: 1_7
    key: NullWritable  Value: 2
    key: NullWritable  Value: 3
    key: NullWritable  Value: 2_3
    key: NullWritable  Value: 6
    key: NullWritable  Value: 8
    key: NullWritable  Value: 6_8
    

    sort-shuffle 步骤会将输出聚合为

    Key: NullWritable  ListOfValue: [1,1_7,2,2_3,3,6,6_8,7,8]
    

    Reducer 遍历值列表(这将是一个有序列表)和

    • 将配对值分离到单独的列表[1_7, 2_3, 6_8]。您可以只检查文本中出现的_ 以找出这对。

    • 如下重新配对空间值。

    [-infinity, 1] [1, 2] [2, 3] [3, 6] [6, 7] [7, 8] [8, +infinity]

    • 重新配对时,只需对照上述列表检查边界即可找到计数。您可以使用“_”拆分该对,并通过parse 函数转换为数字。

    例如-infinity(比如一个非常大的负长 -9999999)超出了所有对范围,因此减速器输出将是

    key:"[-infinity, 1]" (Text Type)value: 0 (IntWritable` type)

    同样适用于pair [1,2], 1&gt;=1 and 2&lt;=7 所以减速器输出

    key:"[1, 2]" (Text 类型)value: 1 (IntWritable` 类型)

    对于 [6,7]6&gt;=1 and 7&lt;=76&gt;=6 and 7&lt;=8 对所以减速器输出

    key:"[1, 2]" (Text 类型)value: 2 (IntWritable` 类型)

    等等……

    注意:NullWritable 是一个Java hadoop API,它仅代表null。除了NullWritable,您可以使用任何常量数据(比如Hadoop Text 类型Writable)。这里的重点是确保由于相同的映射器键,所有映射器输出都应该到达单个减速器。

    【讨论】:

    • 这样,它可以分成预期的部分。但是,reducer 如何计算出 [6, 7] 应该是 2,因为在 [6, 7] 区间上有两个区间重叠。谢谢。
    猜你喜欢
    • 2013-02-25
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-07-29
    • 1970-01-01
    • 1970-01-01
    • 2021-07-01
    • 1970-01-01
    相关资源
    最近更新 更多