【问题标题】:Create an array column from other columns after processing the column values处理列值后从其他列创建数组列
【发布时间】:2018-11-14 03:50:58
【问题描述】:

假设我有一个包含分类列(学校、类型、组)的 spark 数据框

------------------------------------------------------------
StudentID  |  School |   Type        |  Group               
------------------------------------------------------------
1          |  ABC    |   Elementary  |  Music-Arts          
2          |  ABC    |   Elementary  |  Football            
3          |  DEF    |   Secondary   |  Basketball-Cricket  
4          |  DEF    |   Secondary   |  Cricket             
------------------------------------------------------------

我需要在数据框中再添加一列,如下所示:

--------------------------------------------------------------------------------------
StudentID  |  School |   Type        |  Group               |  Combined Array
---------------------------------------------------------------------------------------
1          |  ABC    |   Elementary  |  Music-Arts          | ["School: ABC", "Type: Elementary", "Group: Music", "Group: Arts"]
2          |  ABC    |   Elementary  |  Football            | ["School: ABC", "Type: Elementary", "Group: Football"]
3          |  DEF    |   Secondary   |  Basketball-Cricket  | ["School: DEF", "Type: Secondary", "Group: Basketball", "Group: Cricket"]
4          |  DEF    |   Secondary   |  Cricket             | ["School: DEF", "Type: Secondary", "Group: Cricket"]
----------------------------------------------------------------------------------------

额外的列是所有分类列的组合,但包含对“组”列的不同处理。 “组”列的值需要在“-”上拆分。

包括“组”在内的所有分类列都包含在一个列表中。 “组”列也作为要拆分的列作为字符串输入。数据框还有其他未使用的列。

我正在寻找性能最佳的解决方案。

如果它是一个简单的数组,它可以通过单个 'withColumn' 转换来完成。

val columns = List("School", "Type", "Group")
var df2 = df1.withColumn("CombinedArray", array(columns.map(df1(_)):_*))

但是,这里由于“组”列中的额外处理,解决方案似乎并不简单。

【问题讨论】:

  • 只是为了确定:为什么要在合并列中包含冗余信息?我明白为什么你想要一个包含组的“-”-split 的数组,但我不太确定其他值。我建议df.withColumn("combined", split($"Group", "-"))
  • 该列将被馈送到countVectorizer,因此数组的每个条目(类别:值)将被不同地标识。例如,相同的值可能出现在不同的类别中。
  • 啊,我明白了,如果您将分组拆分添加到其中,stack0114106 得到了正确答案。 ;)
  • 如果您不想与 String-concats 争论太多,以便为不同类型的信息添加标识前缀(这对于 Group 类别可能有点烦人),您也可以只做:df.withColumn("combined", split($"Group", "-")).withColumn("SchoolArray", array($"School")).withColumn("TypeArray", array($"Type")) 并为每个“XYZArrays”应用 3 个 CountVectorizers 和一个最终的 VectorAssembler 以将所有内容放在一起。这个版本的好处是,您可以为每个 CountVectorizers 定义不同的最小频率。

标签: scala apache-spark apache-spark-sql rdd


【解决方案1】:

使用 spark.sql(),检查一下:

Seq(("ABC","Elementary","Music-Arts"),("ABC","Elementary","Football"),("DEF","Secondary","Basketball-Cricket"),("DEF","Secondary","Cricket"))
  .toDF("School","Type","Group").createOrReplaceTempView("taba")
spark.sql( """ select school, type, group, array(concat('School:',school),concat('type:',type),concat('group:',group)) as combined_array from taba """).show(false)

输出:

+------+----------+------------------+------------------------------------------------------+
|school|type      |group             |combined_array                                        |
+------+----------+------------------+------------------------------------------------------+
|ABC   |Elementary|Music-Arts        |[School:ABC, type:Elementary, group:Music-Arts]       |
|ABC   |Elementary|Football          |[School:ABC, type:Elementary, group:Football]         |
|DEF   |Secondary |Basketball-Cricket|[School:DEF, type:Secondary, group:Basketball-Cricket]|
|DEF   |Secondary |Cricket           |[School:DEF, type:Secondary, group:Cricket]           |
+------+----------+------------------+------------------------------------------------------+

如果你需要它作为数据框,那么

val df = spark.sql( """ select school, type, group, array(concat('School:',school),concat('type:',type),concat('group:',group)) as combined_array from taba """)
df.printSchema()

root
 |-- school: string (nullable = true)
 |-- type: string (nullable = true)
 |-- group: string (nullable = true)
 |-- combined_array: array (nullable = false)
 |    |-- element: string (containsNull = true)

更新:

动态构造 sql 列。

scala> val df = Seq(("ABC","Elementary","Music-Arts"),("ABC","Elementary","Football"),("DEF","Secondary","Basketball-Cricket"),("DEF","Secondary","Cricket")).toDF("School","Type","Group")
df: org.apache.spark.sql.DataFrame = [School: string, Type: string ... 1 more field]

scala> val columns = df.columns.mkString("select ", ",", "")
columns: String = select School,Type,Group

scala> val arr = df.columns.map( x=> s"concat('"+x+"',"+x+")" ).mkString("array(",",",") as combined_array ")
arr: String = "array(concat('School',School),concat('Type',Type),concat('Group',Group)) as combined_array "

scala> val sql_string = columns + " , " + arr + " from taba "
sql_string: String = "select School,Type,Group , array(concat('School',School),concat('Type',Type),concat('Group',Group)) as combined_array  from taba "

scala> df.createOrReplaceTempView("taba")

scala> spark.sql(sql_string).show(false)
+------+----------+------------------+---------------------------------------------------+
|School|Type      |Group             |combined_array                                     |
+------+----------+------------------+---------------------------------------------------+
|ABC   |Elementary|Music-Arts        |[SchoolABC, TypeElementary, GroupMusic-Arts]       |
|ABC   |Elementary|Football          |[SchoolABC, TypeElementary, GroupFootball]         |
|DEF   |Secondary |Basketball-Cricket|[SchoolDEF, TypeSecondary, GroupBasketball-Cricket]|
|DEF   |Secondary |Cricket           |[SchoolDEF, TypeSecondary, GroupCricket]           |
+------+----------+------------------+---------------------------------------------------+


scala>

更新2:

scala>  val df = Seq((1,"ABC","Elementary","Music-Arts"),(2,"ABC","Elementary","Football"),(3,"DEF","Secondary","Basketball-Cricket"),(4,"DEF","Secondary","Cricket")).toDF("StudentID","School","Type","Group")
df: org.apache.spark.sql.DataFrame = [StudentID: int, School: string ... 2 more fields]

scala> df.createOrReplaceTempView("student")

scala>  val df2 = spark.sql(""" select studentid, collect_list(concat('Group:', t.sp1)) as sp2 from (select StudentID,School,Type,explode((split(group,'-'))) as sp1 from student where size(split(group,'-')) > 1 ) t group by studentid """)
df2: org.apache.spark.sql.DataFrame = [studentid: int, sp2: array<string>]

scala> val df3 = df.alias("t1").join(df2.alias("t2"),Seq("studentid"),"LeftOuter")
df3: org.apache.spark.sql.DataFrame = [StudentID: int, School: string ... 3 more fields]

scala> df3.createOrReplaceTempView("student2")

scala> spark.sql(""" select studentid, school,group, type, array(concat('School:',school),concat('type:',type),concat_ws(',',temp_arr)) from (select studentid,school,group,type, case when sp2 is null then array(concat("Group:",group)) else sp2 end as temp_arr from student2) t """).show(false)
+---------+------+------------------+----------+---------------------------------------------------------------------------+
|studentid|school|group             |type      |array(concat(School:, school), concat(type:, type), concat_ws(,, temp_arr))|
+---------+------+------------------+----------+---------------------------------------------------------------------------+
|1        |ABC   |Music-Arts        |Elementary|[School:ABC, type:Elementary, Group:Music,Group:Arts]                      |
|2        |ABC   |Football          |Elementary|[School:ABC, type:Elementary, Group:Football]                              |
|3        |DEF   |Basketball-Cricket|Secondary |[School:DEF, type:Secondary, Group:Basketball,Group:Cricket]               |
|4        |DEF   |Cricket           |Secondary |[School:DEF, type:Secondary, Group:Cricket]                                |
+---------+------+------------------+----------+---------------------------------------------------------------------------+


scala>

【讨论】:

  • 此解决方案没有解决“组”值需要动态拆分的核心问题
  • 谢谢,但更新仍然没有解决这里的核心问题:如果您查看我的输出,第一行和第三行的数组大小为 4。我们需要拆分“组”基于 '-' 的列并向数组添加多个元素,每个拆分一个。
  • @John Subas.. 你能检查一下 Update2
【解决方案2】:

使用正则表达式替换每个字段的开头和中间的“-”:

val df1 = spark.read.option("header","true").csv(filePath)
val columns = List("School", "Type", "Group")
var df2 = df1.withColumn("CombinedArray", array(columns.map{
   colName => regexp_replace(regexp_replace(df1(colName),"(^)",s"$colName: "),"(-)",s", $colName: ")
}:_*))

【讨论】:

  • 这可能会奏效。我需要进行一些修改,以便仅在类别列中包含选定列的拆分。将尝试解决并在此处发布答案。
  • 不接受答案的原因?按您提到的预期输出工作。
  • 一旦我能够处理您的代码以获得确切的解决方案,我将接受它。只需要对一列进行拆分:“组”,而不是所有列
  • 下面的代码将是准确的答案: var df2 = df.withColumn("CombinedArray", array(columns.map( colName => { colName match { case "Group" => regexp_replace(regexp_replace) (df(colName),"(^)",s"$colName: "),"(-)",s", $colName: ") case _ => regexp_replace(df(colName),"(^)" ,s"$colName:") } }):_*))
  • 什么会转换成 pyspark?
【解决方案3】:

您需要先添加一个空列,然后像这样映射它(在 Java 中):

StructType newSchema = df1.schema().add("Combined Array", DataTypes.StringType);

df1 = df1.withColumn("Combined Array", lit(null))
        .map((MapFunction<Row, Row>) row ->
            RowFactory.create(...values...) // add existing values and new value here
        , newSchema);

在 Scala 中应该非常相似。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2018-03-19
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-03-17
    相关资源
    最近更新 更多