【问题标题】:Update columns when iterate over DataFrame迭代 DataFrame 时更新列
【发布时间】:2017-01-19 10:14:37
【问题描述】:

有数据框:

import sqlContext.implicits._

case class TestData(banana: String, orange: String, apple : String, feijoa: String)

var data = sc.parallelize((1 to 5).map(i => TestData("banana="+i.toString,
                    "orange="+i.toString,"apple="+i.toString,"feijoa="+i.toString))).toDF

data.registerTempTable("data")
data.show 

如下所示:

+--------+--------+-------+--------+
|  banana|  orange|  apple|  feijoa|
+--------+--------+-------+--------+
|banana=1|orange=1|apple=1|feijoa=1|
|banana=2|orange=2|apple=2|feijoa=2|
|banana=3|orange=3|apple=3|feijoa=3|
|banana=4|orange=4|apple=4|feijoa=4|
|banana=5|orange=5|apple=5|feijoa=5|
+--------+--------+-------+--------+

另外还有一个sorted列表results

case class result(fruits: Set[String], weight: Double)

val results = List(
         result(Set("banana=1"), 200),
         result(Set("banana=3", "orange=3"), 180),
         result(Set("banana=2", "orange=2", "apple=3"), 170)
 )

我想迭代results,将单个result 与数据框中的行进行比较,如果行contains 是特定的result,则将1 设置在适当的列中

更新:数据框中的每一列只包含一个值,例如banana = 1。由这些值组成的一组result.fruits

1) 我知道如何迭代结果:

(0 to results.size-1)
  .map(i => results(i).fruits)

2) 我知道如何按results 的大小向数据框添加列

data =
(1 to results.size)
 .par
 .foldLeft(data){ case(data,i) => 
  data.withColumn(i.toString(),lit(0) )
}     


+--------+--------+-------+--------+-+-+-+
|  banana|  orange|  apple|  feijoa|1|2|3|
+--------+--------+-------+--------+-+-+-+
|banana=1|orange=1|apple=1|feijoa=1|0|0|0|
|banana=2|orange=2|apple=2|feijoa=2|0|0|0|
|banana=3|orange=3|apple=3|feijoa=3|0|0|0|
|banana=4|orange=4|apple=4|feijoa=4|0|0|0|
|banana=5|orange=5|apple=5|feijoa=5|0|0|0|
+--------+--------+-------+--------+-+-+-+

3) 我需要帮助来了解如何组合 select 函数来检查特定 row 是否包含 result.fruits ,然后在适当的列中将值设置为 1 :首先来自 results 在列#1results 列中的第二个#2

【问题讨论】:

    标签: scala apache-spark apache-spark-sql


    【解决方案1】:

    尝试这样的事情(给出简单的解决方案,但你可以概括它):

    data = data.withColumn("combined", array($"banana",$"orange", $"apple",$"feijoa"))
    def getFunc(resultSet: Set[String]) = {
        def f(x: Seq[String]): Int = {
            if(resultSet.forall(y=>x.contains(y))) 1 else 0
        }
        udf(f _)
    }
    
    data =(1 to results.size).foldLeft(data){
      (x,i) => x.withColumn(i.toString, getFunc(results(i-1).fruits)($"combined"))
    }  
    

    【讨论】:

      猜你喜欢
      • 2020-05-10
      • 1970-01-01
      • 2022-06-29
      • 1970-01-01
      • 1970-01-01
      • 2018-12-29
      • 2019-09-11
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多