【问题标题】:Aggregating one RDD according to value of another RDD Spark (Java)根据另一个 RDD Spark (Java) 的值聚合一个 RDD
【发布时间】:2016-02-23 12:44:06
【问题描述】:

我有两个包含时间信息的 RDD。 RDD 被分成不同的分区。 一种是形式

16:00:00
16:00:18
16:00:25
16:01:01
16:01:34
16:02:12
16:02:42
...

和另一个包含 tuple2 形式的时间跨度

<16:00:00, 16:00:59>
<16:01:00, 16:01:59>
<16:02:00, 16:02:59>
...

我需要聚合第一个和第二个 RDD,通过根据第二个中的值聚合第一个 RDD 的值,以获得类似的东西

<<16:00:00, 16:00:59>, [16:00:00,16:00:18,16:00:25]>
<<16:01:00, 16:01:59>, [16:01:01,16:01:34]>
<<16:02:00, 16:02:59>, [16:02:12,16:02:42]>
...

或者,或者,类似的东西

<<16:00:00, 16:00:59>, 16:00:00>
<<16:00:00, 16:00:59>, 16:00:18>
<<16:00:00, 16:00:59>, 16:00:25>
<<16:01:00, 16:01:59>, 16:01:01>
<<16:01:00, 16:01:59>, 16:01:34>
<<16:02:00, 16:02:59>, 16:02:12>
<<16:02:00, 16:02:59>, 16:02:42>
...

我正在尝试使用整个范围的 spark 转换函数,但我很难找到一个适用于这种不同性质的 RDD 的函数。我知道我可能会选择cartesian 产品,然后进行过滤,但我想要一个“更好”的解决方案。我试过zipPartition,这可能有效,但我的分区可能不一致,例如16:00:00 可能最终出现在不存在相应聚合值(元组&lt;16:00:00, 16:00:59&gt;)的分区中。 解决这个问题的最佳方法是什么?

PS:我使用的是 Java,但也欢迎使用 Scala 解决方案。 谢谢

【问题讨论】:

  • 间隔是否总是规律的?

标签: java apache-spark transformation aggregation rdd


【解决方案1】:

我已将以下内容简化为使用整数,但我相信同样可以多次使用。虽然示例是用 Scala 编写的,但我怀疑它也可以用 Java 完成。

如果范围是常规的,我会将“值”RDD 转换为 range,value,然后进行简单的连接。

val values = Seq(1, 5, 10, 14, 20)
val valuesRdd = sc.parallelize(values, 2)
valuesRdd.map(x => (((x/10)*10, ((x/10)*10)+9), x)).collect

但是,如果范围不规则,则:

如果您不介意使用 DataFrames,那么一个选项是使用 用户定义的函数 根据是否 V 在给定范围内创建列并加入该列。

case class Range(low : Int, high :Int)
val ranges = Seq( Range(0,9), Range(10,19), Range(20,29));
val rangesDf = sc.parallelize(ranges, 2).toDF

case class Value(value : Int)
val values = Seq(Value(1), Value(5), Value(10), Value(14), Value(20))
val valuesDf = sc.parallelize(values, 2).toDF

val inRange = udf{(v: Int, low: Int, high : Int) => v >= low && v<= high}

rangesDf.join(valuesDf, inRange(valuesDf("value"), rangesDf("low"), rangesDf("high"))).show

下一个选项是分解范围并加入分解后的版本:

val explodedRange = rangesRdd.map(x => (x, List.range(x._1, x._2 + 1))).flatMap( { case (range, lst) => lst.map { x => (x, range)} })
val valuesRdd = sc.parallelize(values, 2).map(x => (x,true))
valuesRdd.join(explodedRange).map(x => (x._2._2, x._1)).collect

【讨论】:

  • 我试图放弃使用 DataFrame 作为最后的解决方案。您的最后一个选项似乎可行,但我仍然需要每秒扩展范围,显着增加数据数量(可能仍然比cartesian 转换更好)。如果我说我可以在第三行使用leftOuterJoin,我错了吗?最后,我的对象可能是 Calendar 对象(或类似对象):Spark 在比较索引时使用什么(例如在join 中)?是否使用compareTo 方法?
  • 如果您遇到value 可能不存在于任何range 中的情况,那么使用leftOuterJoin 将确保您获得该值。我相信 Spark 在比较连接中的键时使用 .equals。
  • 我正在尝试第三个选项,但显然最后一步的连接存在一些问题。我正在使用一个 MyDate 类来表示一个日期,这样我就可以覆盖等号(我需要它)。 valuesRddexplodedRange 中的索引是 MyDate 的实例。在比较连接中的索引时实际上使用了 Equals,但显然连接不会将valuesRdd 中的每个索引与explodedRange 中的每个索引进行比较。最让我害怕的是,在不同的运行中,连接返回不同的结果(0、1 或 2 条记录的 rdd,而它们应该是 695)。使用单个分区。
  • 我建议您使用一个小的自包含示例创建一个新问题。
猜你喜欢
  • 2016-12-23
  • 2016-12-16
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-08-19
  • 2017-06-17
  • 2016-08-23
相关资源
最近更新 更多