【问题标题】:Pivoting DataFrame - Spark SQL透视数据帧 - Spark SQL
【发布时间】:2017-11-15 05:26:03
【问题描述】:

我有一个包含以下内容的 DataFrame:

TradeId|Source
ABC|"USD,333.123,20170605|USD,-789.444,20170605|GBP,1234.567,20150602"

我想把这些数据转成下面的数据

TradeId|CCY|PV
ABC|USD|333.123
ABC|USD|-789.444
ABC|GBP|1234.567

“来源”列中的 CCY|PV|Date 三元组的数量不固定。我可以在 ArrayList 中执行此操作,但这需要在 JVM 中加载数据并破坏 Spark 的全部意义。

假设我的 DataFrame 如下所示:

DataFrame tradesSnap = this.loadTradesSnap(reportRequest);
String tempTable = getTempTableName();
tradesSnap.registerTempTable(tempTable);
tradesSnap = tradesSnap.sqlContext().sql("SELECT TradeId, Source FROM " + tempTable);

【问题讨论】:

    标签: java scala apache-spark apache-spark-sql pivot


    【解决方案1】:

    如果你读到databricks pivot,它会写成" A pivot is an aggregation where one (or more in the general case) of the grouping columns has its distinct values transposed into individual columns." 我猜这不是你想要的

    我建议您使用withColumnfunctions 来获得您想要的最终输出。考虑到dataframe 是您所拥有的,您可以执行以下操作

    +-------+----------------------------------------------------------------+
    |TradeId|Source                                                          |
    +-------+----------------------------------------------------------------+
    |ABC    |USD,333.123,20170605|USD,-789.444,20170605|GBP,1234.567,20150602|
    +-------+----------------------------------------------------------------+
    

    您可以使用explodesplitwithColumn 执行以下操作以获得所需的输出

    val explodedDF = dataframe.withColumn("Source", explode(split(col("Source"), "\\|")))
    val finalDF = explodedDF.withColumn("CCY", split($"Source", ",")(0))
      .withColumn("PV", split($"Source", ",")(1))
      .withColumn("Date",  split($"Source", ",")(2))
      .drop("Source")
    
    finalDF.show(false)
    

    最终的输出是

    +-------+---+--------+--------+
    |TradeId|CCY|PV      |Date    |
    +-------+---+--------+--------+
    |ABC    |USD|333.123 |20170605|
    |ABC    |USD|-789.444|20170605|
    |ABC    |GBP|1234.567|20150602|
    +-------+---+--------+--------+
    

    希望能解决你的问题

    【讨论】:

    • 是的,我最终也找到了这个选项。它更容易理解,甚至可以将其添加到我的初始选择查询中。
    • 很高兴听到@Archilles。并感谢您的接受和支持:)
    【解决方案2】:

    您想要实现的目标看起来更像flatMap,而不是旋转。

    简单地说,通过在Dataset 上使用flatMap,您可以对每一行应用一个函数(map),该函数本身会产生一系列行。然后将每组行连接成一个序列 (flat)。

    下面的程序展示了这个想法:

    import org.apache.spark.sql.SparkSession
    
    case class Input(TradeId: String, Source: String)
    
    case class Output(TradeId: String, CCY: String, PV: String, Date: String)
    
    object FlatMapExample {
    
      // This function will produce more rows of output for each line of input
      def splitSource(in: Input): Seq[Output] =
        in.Source.split("\\|", -1).map {
          source =>
            println(source)
            val Array(ccy, pv, date) = source.split(",", -1)
            Output(in.TradeId, ccy, pv, date)
        }
    
      def main(args: Array[String]): Unit = {
    
        // Initialization and loading
        val spark = SparkSession.builder().master("local").appName("pivoting-example").getOrCreate()
        import spark.implicits._
        val input = spark.read.options(Map("sep" -> "|", "header" -> "true")).csv(args(0)).as[Input]
    
        // For each line in the input, split the source and then 
        // concatenate each "sub-sequence" in a single `Dataset`
        input.flatMap(splitSource).show
      }
    
    }
    

    根据您的输入,这将是输出:

    +-------+---+--------+--------+
    |TradeId|CCY|      PV|    Date|
    +-------+---+--------+--------+
    |    ABC|USD| 333.123|20170605|
    |    ABC|USD|-789.444|20170605|
    |    ABC|GBP|1234.567|20150602|
    +-------+---+--------+--------+
    

    如果需要,您现在可以获取结果并将其保存为 CSV。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2020-02-27
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多