【问题标题】:Map key, value pair based on similarity of their value in Spark基于 Spark 中值的相似性映射键、值对
【发布时间】:2015-10-22 00:23:39
【问题描述】:

我已经学习 Spark 几个星期了,目前我正在尝试根据他们在 Scala 中使用 Spark 和 Hadoop 的连接对几个项目或人员进行分组。例如,我想看看足球运动员是如何根据他们的俱乐部历史联系起来的。我的“玩家” rdd 将是:

(John, FC Sion)
(Mike, FC Sion)
(Bobby, PSV Eindhoven)
(Hans, FC Sion)

我想要这样的rdd:

(John, <Mike, Hans>)
(Mike, <John, Hans>)
(Bobby, <>)
(Hans, <Mike, John>)

我打算用地图来完成这个。

val splitClubs = players.map(player=> (player._1, parseTeammates(player._2, players)))

其中 parseTeamates 是一个函数,它可以找到同时为同一俱乐部效力的球员(球员._2)

// RDD is not a type, how can I insert rdd into a function?
def parseTeammates(club: String, rdd: RDD) : List[String] = {
    // will generate a list of players that contains same "club" value
    val playerList = rdd.filter(_._1 == club)
    return playerList.values;
}

我收到编译错误,类型不匹配,因为函数应该返回 List[String],但 playerList.values 返回 org.apache.spark.rdd.RDD[List[String]]。任何人都可以帮助我以简单的形式(在我的例子中为 List[String])获取 RDD 的值吗?

另外,我认为有一种更优雅的方式来解决这个问题,而不是创建一个单独的 RDD,然后在新的 RDD 中找到某个键​​,然后将值作为列表返回

【问题讨论】:

  • 社区检测算法有好几种。在这里讨论它们太长了,完全超出了 SO 的范围。你想关注什么?您尝试实现的算法是什么?
  • 我正在尝试使用 map 和 reduce 与 Spark 和 Hadoop 以 Scala 语言实现它
  • 算法是什么? Map reduce 不是一种算法,它是一种源自函数式编程的编程范式。
  • 你能添加你想要实现的伪代码吗?您的描述质量低下,听起来好像您在要求我们为您做作业。
  • 我更改了我的代码,现在它包含了我正在尝试实现的代码

标签: scala apache-spark key-value keyvaluepair


【解决方案1】:

我认为您的 parseTeammates 方法在 RDD 的世界中有点过时。当涉及到处理 RDD 和可能真的非常大量的数据时,你不想做这种嵌套循环。请尝试重新组织您的数据。

下面的代码会给你你想要的

players.map{case(player, club) => (club, List(player))}
   .reduceByKey(_++_)
   .flatMap{case(_, list) =>list.zipWithIndex.map{case(player, index) => (player, list.take(index) ++ list.drop(index+1))}}

请注意,我首先根据他们效力的俱乐部组织数据,然后将球员组合起来以产生您正在寻找的格式的结果。

我希望这会有所帮助。

【讨论】:

    【解决方案2】:

    对@Glennie 的解决方案的不同看法(谁是 IMO 对您的初始方法不合适的说法)。

    TL;DR;

    players.map { case (player, team) => (team, mutable.HashSet[String](player)) }
      .reduceByKey(_++=_)
      .flatMap {
          case (team, players) => {
            for (player <- players)
              yield (player, players - player)
          }
      }
    

    基本思路是一样的(建立一个以团队为关键字的teammattes列表,flatMap这个结果)。但我建议使用其他构建块来获得相同的结果。这是否成功取决于品味和数据集的性能特征。

    reduceByKey 的不同看法

    在这里,按键归约涉及将(玩家的)集合与一个或多个玩家连接起来。 如果我们取原来的代码:

    players.map{case(player, club) => (club, List(player))}
       .reduceByKey(_++_)
    

    在内部,我们最终会调用类似的东西(从 scala 1.4 开始):

    def add: (List[String], List[String]) => List[String] = _++_;
    
    players.map { case (player, team) => (team, List(player)) }
           .combineByKey(
               // The first time we see a new team on each partition
               (list: List[String]) => list, 
               // invoked each time we fusion a player in its team's list
               // (e.g. map side combine) 
               add, 
               // invoked each time we fusion each team's partial lists
               // (e.g. reduce side combine)
               add)
    

    这里的要点是add 操作(_++_)被调用了很多次。所以最好优化一下。
    在这方面,我们知道List 表现不佳,因为每个突变都需要将现有列表整个复制到另一个列表中。请注意:“差”实际上可能无关紧要。如果您有数百万个团队,并且每个团队只有 20 名玩家,那么 ++ 的性能可能会与缩减中涉及的其他 spark 计算相形见绌。

    (在我的脑海中,更糟糕的是:如果List 变得非常大,看到它的序列化中涉及的一些操作是递归实现的,我们可能会遇到堆栈溢出。我必须检查一下那个)。

    所以我们可能会从切换到可变集合中受益,如下所示:

    players.map { case (player, team) => (team, mutable.ArrayBuffer[String](player)) }
      .reduceByKey(_++=_)
    
    1. 我们现在有一个可变集合,针对它优化了连接
    2. 我们使用++= 而不是++,这样每次融合两个现有集合时,我们甚至都不需要分配一个全新的集合
    3. 如果我们对数据集了解得很好,我们可能能够预先调整缓冲区的大小以实现可预测的内存分配,并尽可能避免调整缓冲区大小。或相应地切换实施。

    flatMap 的不同看法

    使用可变集合的好处

    最初的实现再次使用了广泛的列表操作,例如 takedrop,并结合了带索引的 zip。

    使用可变集合在可读性方面为我们提供了更好的服务,因为我们可以替换 3 个不可变列表副本(takedrop++):

    list.take(index) ++ list.drop(index+1)
    

    只有一个(- 执行克隆)

    list - list(index)
    

    替代语法

    我们还可以提供一个完全不同的实现,避免使用索引压缩来利用理解:

      .flatMap {
          case (team, players) => {
            for (player <- players)
              yield (player, players - player)
          }
        }
    

    请注意,players - player 步骤涉及在列表中查找玩家。使用ArrayBuffer,这是一个 O(n) 操作。因此,如果我们沿着这条路走,我们可能会再次根据数据集,更喜欢使用 mutable.HashSet 作为可变集合而不是数组缓冲区。

    (我打算添加 如果我们的球员姓名没有重复,但这没关系,因为如果你的团队中有两个“约翰”,那么它是没有用的在你的 RDD 中有两行代表两个 John,它的意义不在于单行。)

    【讨论】:

      猜你喜欢
      • 2015-11-23
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-12-27
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2022-10-28
      相关资源
      最近更新 更多