【问题标题】:Spark: how to group together one column of elements based on other two columns of elements in an RDDSpark:如何根据RDD中的其他两列元素将一列元素组合在一起
【发布时间】:2019-07-26 16:33:59
【问题描述】:

我有一个包含 3 列(road_idx、snodeidx、enodeidx)的 RDD。 它看起来像这样:

(roadidx_995, 1138, 1145)
(roadidx_996, 1138, 1139)
(roadidx_997, 2740, 1020)
(roadidx_998, 2762, 2740)
(roadidx_999, 3251, 3240)
.........

如何将具有共同的 snodeidx 或 enodeidx 之一的 road_idx 组合在一起?给每组一个从 1 开始的数字。

预期输出:

(1,[roadidx_995,roadidx_996])
(2,[roadidx_997,roadidx_998])
(3,[roadidx_999])

如上图,

roadidx_995 和 roadidx_996 具有相同的 snodeidx 1138。

roadidx_997 的 snodeidx 与 roadidx_998 的 enodeidx 相同,即 2740。

roadidx_999 单独在一个组中。

Scala 代码或 Python 代码都可以。只要你能告诉我使用 RDD API 得到预期输出的逻辑即可。

非常感谢!

【问题讨论】:

  • 您好,您找到解决方案了吗?
  • @AlexandrosBiratsis 不,我改变了方法。仍然找不到节省计算成本的方法
  • 您好,我认为您需要找到另一种对齐数据的方法。您目前表示数据的方式非常复杂,这就是您找不到解决方案的原因。尝试以不同的方式拆分数据,并找到表达实体之间关系的替代方法。

标签: scala apache-spark pyspark rdd


【解决方案1】:

可以实现为:

  1. 在两个 rdd 上拆分原始文件 - 按“开始”和“结束”节点分组。
  2. 将原始数据集与 1) 中的值连接几次,得到四列,如:

    |------------------|----------------|--------------|----------------|
    | start join start | start join end | end join end | end join start |
    |------------------|----------------|--------------|----------------|
    
  3. 将四列中的值合二为一

在Scala上可以实现:

val data = List(
  ("roadidx_995", 1138, 1145),
  ("roadidx_996", 1138, 1139),
  ("roadidx_997", 2740, 1020),
  ("roadidx_998", 2762, 2740),
  ("roadidx_999", 3251, 3240)
)
val original = sparkContext.parallelize(data)

val groupedByStart = original.map(v => (v._1, v._2)).groupBy(_._2).mapValues(_.map(_._1))
val groupedByEnd = original.map(v => (v._1, v._3)).groupBy(_._2).mapValues(_.map(_._1))
val indexesOnly = original.map(allRow => (allRow._2, allRow._3))

// join by start value
val startJoinsStart = indexesOnly.keyBy(_._1).join(groupedByStart)
val startJoinsEnd = startJoinsStart.leftOuterJoin(groupedByEnd)

// join by end value
val endKeys = startJoinsEnd.values.keyBy(_._1._1._2)

val endJoinsEnd = endKeys.join(groupedByEnd)
val endJoinsStart = endJoinsEnd.leftOuterJoin(groupedByStart)

// flatten to output format
val result = endJoinsStart
  .values
  .map(v => (v._1._1._1._2, v._1._1._2, v._1._2, v._2))
  .map(v => v._1 ++ v._2.getOrElse(Seq()) ++ v._3 ++ v._4.getOrElse(Seq()))
  .map(_.toSet)
  .distinct()

result.foreach(println)

输出是:

Set(roadidx_995, roadidx_996)
Set(roadidx_998, roadidx_997)
Set(roadidx_999)

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2023-03-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-02-13
    • 1970-01-01
    相关资源
    最近更新 更多