【问题标题】:scala - spread rdd using map listscala - 使用地图列表传播rdd
【发布时间】:2019-03-11 09:47:19
【问题描述】:

我想使用列表映射来传播 rdd。

输入样本是

Log("key1", "key2", "key3", Map(tk1 -> tv1, tk2 -> tv2, tk3 -> tv3))

我想要的输出样本是

RDD[(String, String, String, String, String)]
("key1", "key2", "key3", "tk1", "tv1")
("key1", "key2", "key3", "tk2", "tv2")
("key1", "key2", "key3", "tk3", "tv3")

最后,我想做如下图的reduce操作。 但它不起作用。

val mapCnt = logs.map(log => {
  log.textMap.foreach { tmap =>
    var tkey = tmap._1
    var tvalue = tmap._2
  }
  ((log.key1, log.key2, log.key3, tkey, tvalue), 1L)
}).reduceByKey(_ + _)

这是我使用的输入对象。

case class Log(
            val key1: String,
            val key2: String,
            val key3: String,
            val TextMap: Map[String, String]
          ) 

我该如何转换它?

感谢您的帮助。

【问题讨论】:

    标签: scala apache-spark foreach rdd


    【解决方案1】:

    您在foreach 中计算结果并立即丢弃它们。此外,这些值超出了范围。最好在这里使用flatMap

    val mapCnt = logs.flatMap(log => {
      for { 
        (tkey, tvalue) <- tmap
      } yield ((log.key1, log.key2, log.key3, tkey, tvalue), 1L)
    }).reduceByKey(_ + _)
    

    【讨论】:

      【解决方案2】:

      我不确定第二部分,但下面是第一部分的 DF 解决方案。

      scala> case class Log(
           |             val key1: String,
           |             val key2: String,
           |             val key3: String,
           |             val TextMap: Map[String, String]
           |           )
      defined class Log
      
      scala> val df = Seq(Log("key1", "key2", "key3", Map("tk1" -> "tv1", "tk2" -> "tv2", "tk3" -> "tv3"))).toDF().as[Log]
      df: org.apache.spark.sql.Dataset[Log] = [key1: string, key2: string ... 2 more fields]
      
      scala> val df2 = df.withColumn("mapk",map_keys('TextMap))
      df2: org.apache.spark.sql.DataFrame = [key1: string, key2: string ... 3 more fields]
      
      scala> val df3 = df2.select('key1,'key2,'key3,'TextMap,'mapk, explode('mapk).as("exp1")).withColumn("exp2",('Textmap)('exp1)).drop("TextMap","mapk")
      df3: org.apache.spark.sql.DataFrame = [key1: string, key2: string ... 3 more fields]
      
      scala> df3.show
      +----+----+----+----+----+
      |key1|key2|key3|exp1|exp2|
      +----+----+----+----+----+
      |key1|key2|key3| tk1| tv1|
      |key1|key2|key3| tk2| tv2|
      |key1|key2|key3| tk3| tv3|
      +----+----+----+----+----+
      
      
      scala> df3.printSchema
      root
       |-- key1: string (nullable = true)
       |-- key2: string (nullable = true)
       |-- key3: string (nullable = true)
       |-- exp1: string (nullable = true)
       |-- exp2: string (nullable = true)
      
      
      scala>
      

      【讨论】:

        猜你喜欢
        • 2013-07-28
        • 1970-01-01
        • 2015-01-19
        • 2022-01-12
        • 2017-08-01
        • 2017-07-30
        • 2020-10-26
        • 1970-01-01
        • 2020-07-06
        相关资源
        最近更新 更多