【问题标题】:Split 1 column into 3 columns in spark scala在 spark scala 中将 1 列拆分为 3 列
【发布时间】:2017-01-08 10:04:30
【问题描述】:

我在 Spark 中有一个使用 scala 的数据框,其中有一列需要拆分。

scala> test.show
+-------------+
|columnToSplit|
+-------------+
|        a.b.c|
|        d.e.f|
+-------------+

我需要将此列拆分为如下所示:

+--------------+
|col1|col2|col3|
|   a|   b|   c|
|   d|   e|   f|
+--------------+

我使用的是 Spark 2.0.0

谢谢

【问题讨论】:

    标签: scala apache-spark


    【解决方案1】:

    试试:

    import sparkObject.spark.implicits._
    import org.apache.spark.sql.functions.split
    
    df.withColumn("_tmp", split($"columnToSplit", "\\.")).select(
      $"_tmp".getItem(0).as("col1"),
      $"_tmp".getItem(1).as("col2"),
      $"_tmp".getItem(2).as("col3")
    )
    

    这里要注意的重点是sparkObject 是您可能已经初始化的SparkSession 对象。因此,(1) import 语句必须强制内联在代码中,而不是在类定义之前。

    【讨论】:

    • 效果很好,谢谢!
    • 拆分需要什么导入?
    • @Jake import org.apache.spark.sql.functions.split 使用这个
    • @Surendra Pratap 谢谢!我被搜索了 30 分钟 :)
    • import spark.implicits._ 使用 $-notation。
    【解决方案2】:

    避免选择部分的解决方案。当您只想追加新列时,这很有帮助:

    case class Message(others: String, text: String)
    
    val r1 = Message("foo1", "a.b.c")
    val r2 = Message("foo2", "d.e.f")
    
    val records = Seq(r1, r2)
    val df = spark.createDataFrame(records)
    
    df.withColumn("col1", split(col("text"), "\\.").getItem(0))
      .withColumn("col2", split(col("text"), "\\.").getItem(1))
      .withColumn("col3", split(col("text"), "\\.").getItem(2))
      .show(false)
    
    +------+-----+----+----+----+
    |others|text |col1|col2|col3|
    +------+-----+----+----+----+
    |foo1  |a.b.c|a   |b   |c   |
    |foo2  |d.e.f|d   |e   |f   |
    +------+-----+----+----+----+
    

    更新:我强烈建议使用Psidom's implementation 以避免分裂三次。

    【讨论】:

      【解决方案3】:

      要以编程方式执行此操作,您可以使用 (0 until 3).map(i => col("temp").getItem(i).as(s"col$i")) 创建一系列表达式(假设您需要 3 列作为结果),然后使用 : _* 语法将其应用于 select

      df.withColumn("temp", split(col("columnToSplit"), "\\.")).select(
          (0 until 3).map(i => col("temp").getItem(i).as(s"col$i")): _*
      ).show
      +----+----+----+
      |col0|col1|col2|
      +----+----+----+
      |   a|   b|   c|
      |   d|   e|   f|
      +----+----+----+
      

      保留所有列:

      df.withColumn("temp", split(col("columnToSplit"), "\\.")).select(
          col("*") +: (0 until 3).map(i => col("temp").getItem(i).as(s"col$i")): _*
      ).show
      +-------------+---------+----+----+----+
      |columnToSplit|     temp|col0|col1|col2|
      +-------------+---------+----+----+----+
      |        a.b.c|[a, b, c]|   a|   b|   c|
      |        d.e.f|[d, e, f]|   d|   e|   f|
      +-------------+---------+----+----+----+
      

      如果您使用的是 pyspark,请使用列表推导替换 scala 中的 map

      df = spark.createDataFrame([['a.b.c'], ['d.e.f']], ['columnToSplit'])
      from pyspark.sql.functions import col, split
      
      (df.withColumn('temp', split('columnToSplit', '\\.'))
         .select(*(col('temp').getItem(i).alias(f'col{i}') for i in range(3))
      ).show()
      +----+----+----+
      |col0|col1|col2|
      +----+----+----+
      |   a|   b|   c|
      |   d|   e|   f|
      +----+----+----+
      

      【讨论】:

      • 我们可以拆分成任意数量的列吗?为什么 3 必须硬编码?
      • 我有类似的情况,但“。”的数量分隔值未知。我们如何动态拆分行?
      • 这里有多个不错的答案,因为它们每个都将事情进一步分解,但是如果您有很多列或很多数据框可以做到这一点(例如从 Kafka 读取许多主题) ,那么这个答案就是要走的路。
      • 第一个答案(不是所有列版本)如何在 pyspark/python 中翻译?
      • 我创建了另一个答案来展示如何在不硬编码列数的情况下实现这种方法。
      【解决方案4】:

      这会将列附加到原始 DataFrame 并且不使用 select,并且只使用临时列拆分一次:

      import spark.implicits._
      
      df.withColumn("_tmp", split($"columnToSplit", "\\."))
        .withColumn("col1", $"_tmp".getItem(0))
        .withColumn("col2", $"_tmp".getItem(1))
        .withColumn("col3", $"_tmp".getItem(2))
        .drop("_tmp")
      

      【讨论】:

        【解决方案5】:

        这扩展了 Psidom 的答案,并展示了如何动态进行拆分,而无需对列数进行硬编码。这个答案运行一个查询来计算列数。

        val df = Seq(
          "a.b.c",
          "d.e.f"
        ).toDF("my_str")
        .withColumn("letters", split(col("my_str"), "\\."))
        
        val numCols = df
          .withColumn("letters_size", size($"letters"))
          .agg(max($"letters_size"))
          .head()
          .getInt(0)
        
        df
          .select(
            (0 until numCols).map(i => $"letters".getItem(i).as(s"col$i")): _*
          )
          .show()
        

        【讨论】:

          【解决方案6】:

          我们可以在 Scala 中使用 for 和 yield 来编写:-

          如果您的列数超过,只需将其添加到所需的列并使用它。 :)

          val aDF = Seq("Deepak.Singh.Delhi").toDF("name")
          val desiredColumn = Seq("name","Lname","City")
          val colsize = desiredColumn.size
          
          val columList = for (i <- 0 until colsize) yield split(col("name"),".").getItem(i).alias(desiredColumn(i))
          
          aDF.select(columList: _ *).show(false) 
          

          输出:-

          +------+------+-----+--+
          |name  |Lname |city |
          +-----+------+-----+---+
          |Deepak|Singh |Delhi|
          +---+------+-----+-----+
          

          如果您不需要 name 列,则删除该列并使用 withColumn。

          【讨论】:

            【解决方案7】:

            示例: 不使用 select 语句。

            假设我们有一个包含一组列的数据框,并且我们想要拆分列名称为 name

            的列
            import spark.implicits._
            
            val columns = Seq("name","age","address")
            
            val data = Seq(("Amit.Mehta", 25, "1 Main st, Newark, NJ, 92537"),
                         ("Rituraj.Mehta", 28,"3456 Walnut st, Newark, NJ, 94732"))
            
            var dfFromData = spark.createDataFrame(data).toDF(columns:_*)
            dfFromData.printSchema()
            
            val newDF = dfFromData.map(f=>{
            val nameSplit = f.getAs[String](0).split("\\.").map(_.trim)
                  (nameSplit(0),nameSplit(1),f.getAs[Int](1),f.getAs[String](2))
                })
            
            val finalDF = newDF.toDF("First Name","Last Name", "Age","Address")
            
            finalDF.printSchema()
            
            finalDF.show(false)
            

            输出:

            【讨论】:

              猜你喜欢
              • 2020-04-24
              • 1970-01-01
              • 2018-09-29
              • 1970-01-01
              • 1970-01-01
              • 1970-01-01
              • 2022-01-26
              • 2011-09-21
              • 2016-05-08
              相关资源
              最近更新 更多