【问题标题】:Scala : Map and Flatmap on RDDScala:RDD 上的地图和平面图
【发布时间】:2016-08-31 22:22:42
【问题描述】:

我有一个具有这种结构的 RDD

     RDD[((String, String), List[(Int, Timestamp, String)])]

和数据

    ((D2,Saad Arif),List((4,2011-10-05 00:00:00.0,C101), (5,2010-01-27 00:00:00.0,C101)))
    ((D3,Faran Abid),List((7,2016-10-05 00:00:00.0,C101)))
    ((D1,Atif Shahzad),List((1,2012-04-15 00:00:00.0,C101), (2,2011-10-05 00:00:00.0,C101), (3,2006-12-25 00:00:00.0,C101)))

将此视为表格的意思

   '(D2,Saad Arif)' 

就像钥匙和

    'List((4,2011-10-05 00:00:00.0,C101), (5,2010-01-27 00:00:00.0,C101)' 

就像这个键的行。 现在我想检查每一行,如果在两年或更长时间之前有代码为“C101”的记录(历史),则将级别设置为 2,否则设置为 1。所以生成的 RDD 应该如下所示

((D2,Saad Arif),List((4,2011-10-05 00:00:00.0,C101, 1), (5,2010-01-27 00:00:00.0,C101, 1)))
((D3,Faran Abid),List((7,2016-10-05 00:00:00.0,C101, 1)))
((D1,Atif Shahzad),List((1,2012-04-15 00:00:00.0,C101, 2), (2,2011-10-05 00:00:00.0,C101, 2), (3,2006-12-25 00:00:00.0,C101, 1)))

注意时间戳之后的新级别。我如何使用地图或平面地图做到这一点?

【问题讨论】:

  • 你了解mapflatMap的区别吗?这显然是map 的一个用例。
  • 另外...请查看您过去的问题。如果有人正确回答了您的问题,请务必通过将他的答案标记为已接受来感谢该人的努力。
  • @Sarvesh Kumar Singh 是的,我对地图和平面地图有基本的了解,但我不知道如何在这种情况下使用。
  • @Sarvesh Kumar Singh 我已将答案标记为已接受。

标签: scala apache-spark


【解决方案1】:
import java.time.LocalDate
import java.time.format.DateTimeFormatter
import java.time.Period    


val df1 = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.S")

val futureDate = LocalDate.parse("2100-01-01 00:00:00.0", df1)

val yourRequiredRdd = yourRdd
  .map({
    case (t, list) => {
      val list1 = list.map({
        case (id, dateStr, id2) => (id, LocalDate.parse(dateStr, df1), id2) 
      })

      val oldestDate = list1
        .filter({ case (id, date, id2) => id2.equals("C101") })
        .map(_._2)
        .foldLeft(futureDate)((oldestDate, date) => {
          val period = Period.between(oldestDate, date)
          if (!period.isNegative()) oldestDate else date
        })

      val newList = list1
        .map({
          case (id, date, "C101") => {
            val periodFromOldestDate = Period.between(oldestDate, date)
            val extraNumber = if (periodFromOldestDate.getYears() >= 2) 2 else 1
            (id, date, "C101", extraNumber)
          }
          case (id, date, id2) => {
            (id, date, id2, 1)
          }
        })

      (t, newList)
    }
  })
  .flatMap({
    case ((pid, name), list) => list.map({
      case (id, date, code, level) => (id, name, code, pid, date, level)
    })
  })

【讨论】:

  • 非常感谢。你能简单解释一下找到'oldestDate'的代码吗?
  • 过滤掉除C101 以外的条目,然后映射以仅保留日期。现在你只有一个日期列表。现在折叠该列表以查找最早的日期。
  • requiredRDD 有这个结构 'RDD[((String, String), List[(Int, LocalDate, String, Int)])]' 我如何将它映射到 'RDD[(Int, String, String, String, LocalDate, Int)]' by something using map { case ((pid, name), (id, date, code, level)) => (id, name, code, pid, date, level) }
  • 添加到答案中。
  • 谢谢我需要那个
猜你喜欢
  • 1970-01-01
  • 2020-12-19
  • 2017-07-30
  • 2017-02-22
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2015-10-16
相关资源
最近更新 更多