【问题标题】:SPARK code for sql case statement and row_number equivalentsql case 语句和 row_number 等效项的 SPARK 代码
【发布时间】:2015-08-01 23:35:03
【问题描述】:

我有一个如下的数据集

hduser@ubuntu:~$ hadoop fs -cat /user/hduser/test_sample/sample1.txt
Eid1,EName1,EDept1,100
Eid2,EName2,EDept1,102
Eid3,EName3,EDept1,101
Eid4,EName4,EDept2,110
Eid5,EName5,EDept2,121
Eid6,EName6,EDept3,99

我想使用 spark 代码生成如下输出

Eid1,EName1,IT,102,1
Eid2,EName2,IT,101,2
Eid3,EName3,IT,100,3
Eid4,EName4,ComSc,121,1
Eid5,EName5,ComSc,110,2
Eid6,EName6,Mech,99,1

相当于下面的SQL

选择 emp_id, emp_name, case when emp_dept='EDept1' then 'IT' when emp_dept='EDept2' then 'ComSc' when emp_dept='EDept3' then 'Mech' end dept_name, emp_sal, row_number() over (partition by emp_dept order by emp_sal desc) as rn from emp

有人可以建议我应该如何在 spark 中获得它。

【问题讨论】:

    标签: apache-spark


    【解决方案1】:

    你可以使用RDD.zipWithIndex,然后将其转换为DataFrame,然后使用min()和join得到你想要的结果。

    像这样:

    import org.apache.spark.sql._
    import org.apache.spark.sql.types._
    
    // SORT BY added as per comment request
    val test = sc.textFile("/user/hadoop/test.txt")
      .sortBy(_.split(",")(2)).sortBy(_.split(",")(3).toInt)
    
    // Table to hold the dept name lookups
    val deptDF = 
      sc.parallelize(Array(("EDept1", "IT"),("EDept2", "ComSc"),("EDept3", "Mech")))
      .toDF("deptCode", "dept")
    
    val schema = StructType(Array(
      StructField("col1", StringType, false),
      StructField("col2", StringType, false),
      StructField("col3", StringType, false),
      StructField("col4", StringType, false),
      StructField("col5", LongType, false))
    )
    
    // join to deptDF added as per comment
    val testDF = sqlContext.createDataFrame(
      test.zipWithIndex.map(tuple => Row.fromSeq(tuple._1.split(",") ++ Array(tuple._2))),
      schema
    )
    .join(deptDF, $"col3" === $"deptCode")
    .select($"col1", $"col2", $"dept" as "col3", $"col4", $"col5")
    .orderBy($"col5")
    
    testDF.show
    
    col1 col2   col3  col4 col5
    Eid1 EName1 IT    100  0
    Eid3 EName3 IT    101  1
    Eid2 EName2 IT    102  2
    Eid4 EName4 ComSc 110  3
    Eid5 EName5 ComSc 121  4
    Eid6 EName6 Mech  99   5
    
    val result = testDF.join(
      testDF.groupBy($"col3").agg($"col3" as "g_col3", min($"col5") as "start"),
      $"col3" === $"g_col3"
    )
    .select($"col1", $"col2", $"col3", $"col4", $"col5" - $"start" + 1 as "index")
    
    result.show
    
    col1 col2   col3   col4 index
    Eid4 EName4 ComSc 110  1
    Eid5 EName5 ComSc 121  2
    Eid6 EName6 Mech  99   1
    Eid1 EName1 IT    100  1
    Eid3 EName3 IT    101  2
    Eid2 EName2 IT    102  3
    

    【讨论】:

    • 感谢大卫的回复。但结果 col4 没有按 desc 顺序排序。能否请您也排序一下。
    • 大卫也没有在您的解决方案中处理案例部分
    • 错过了排序。你有完整的部门名称列表吗?
    • 不,我没有。实际上,这已经通过 sql 中的 case 语句处理(意味着如果 EDept1 则硬编码为 IT 等)。我们可以像这样在 spark 中写任何条件语句吗?
    • 出现错误:可能是缺少 scala 的大括号> val test = sc.textFile("/user/hduser/test_sample/sample1.txt").sortBy(.split(",") (2).sortBy(.split(",")(3).toInt | ) | ) :27: error: value split is not a member of Char 应用程序涉及默认参数时发生错误. val test = sc.textFile("/user/hduser/test_sample/sample1.txt").sortBy(.split(",")(2).sortBy(.split(",") (3).toInt
    猜你喜欢
    • 1970-01-01
    • 2015-01-18
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-01-24
    • 2015-10-01
    • 2014-07-17
    • 1970-01-01
    相关资源
    最近更新 更多