【问题标题】:Scala - map a value from a Map to another MapScala - 将一个值从一个映射映射到另一个映射
【发布时间】:2016-04-07 06:10:19
【问题描述】:

我需要将我的 RDD 中的一个字段映射到另一个映射 UserDAO.users 的另一个字段我试图找出这里的映射,但还不能返回 username。当我进行 foreach 打印 scala.concurrent.impl.Promise$DefaultPromise@7c4c5ddd

时,我在更新的地图中得到了这个

这是我的代码 sn-p:

 rdd.map { l => {
      l.map { case (k, v) => {
        k match {
          case "a_userid" => {
            l.updated("a_username", userDAO.users.map(c => c.filter(f => f.userid == v.toInt)).map(y => y.map(e => e.username)))
          }
          case _ =>
            }
          }
        }
      }
    }

基本上,

rdd - RDD[Map[String, String]]

UserDAO.users - Future[Seq[User]] - 其中 User 是一个案例类

并返回更新后的rdd - RDD[Map[String, String]]

--

知道如何解决这个问题吗?

谢谢

【问题讨论】:

  • 您能指定变量的类型吗?具体来说,rdduserDAO.users。也请给出预期结果的类型。
  • @Aivean 更新了我的问题,谢谢
  • 好的,下一个问题,userDAO.users 返回的用户序列有多大?有什么理由在工作节点上调用它(通过rdd.map)而不是事先实现它?
  • @Aivean: userDAO.users 少于 2000 条记录,但我的 rdd 大约是 1100 万条记录。实现userDAO.users 可能会起作用,但认为可以在不阻塞的情况下做到这一点?
  • 如果你期待具体的结果,你必须在某个地方等待(阻塞)。否则你会怎么想?最后有RDD[Future[Map[String, String]]] 或者Future[RDD[Map[String, String]]]

标签: scala scala-collections


【解决方案1】:

我已重写您的代码以使其正常工作。请注意,它涉及阻塞,否则没有其他方法可以得到具体的RDD[Map[String, String]]

为了清楚起见,我省略了rdd.map 部分。

第一个变体。我使用了您在map 中阅读用户的方法。请注意,这是非常低效的,因为每次迭代都会读取所有用户,即 1100 万次:

// rdd.map ommitted
l.get("a_userid").flatMap {
  userId:String =>
    val newUserName:Option[String] =
      Await.result(userDAO.users
        .map(c => c.find(f => f.userid == userId.toInt))
        .map(y => y.map(e => e.username)),
        30 seconds
      )
    newUserName.map(l.updated("a_username", _))
}.getOrElse(l)

另一种方法是预先让用户阅读地图。然后该地图将广播给所有 spark 工作人员。因为你的地图不是很大,没关系。这种方法效率更高,因为您每次迭代只需在 RDD 上执行单个地图查找,速度很快。

val users:Map[Int, String] =  Await.result(userDAO.users
  .map(uss => uss.map(u => u.userid -> u.username).toMap),
  30 seconds
)

// rdd.map ommitted
l.get("a_userid").flatMap {
  userId:String =>
    users.get(userId.toInt).map(l.updated("a_username", _))
}.getOrElse(l)

UPD:为了完整起见,这里是另一个异步变体:

userDAO.users
  .map(uss => uss.map(u => u.userid -> u.username).toMap)
  .map { users:Map[Int, String] =>
      rdd.map { l:Map[String, String] =>
        l.get("a_userid").flatMap {
          userId:String =>
            users.get(userId.toInt).map(l.updated("a_username", _))
        }.getOrElse(l)
      }
  }

它遵循与variant2相同的方法,但返回Future[RDD[Map[String, String]]]而不是具体结果。

【讨论】:

  • 我想了很多,但很高兴收到您的意见,谢谢。
猜你喜欢
  • 1970-01-01
  • 2019-10-07
  • 1970-01-01
  • 1970-01-01
  • 2020-05-13
  • 1970-01-01
  • 1970-01-01
  • 2015-11-23
相关资源
最近更新 更多