【问题标题】:Spark scala creating dataFrame from rdd using Row and SchemaSpark scala 使用 Row 和 Schema 从 rdd 创建数据帧
【发布时间】:2019-02-17 04:26:33
【问题描述】:

我正在尝试从 RDD 创建一个数据帧,以便能够写入具有以下格式的 json 示例 json 如下所示(预期输出)

“1234”:[ { 位置:'abc', 成本1:1.234, 成本2:2.3445 }, { 位置:'www', 成本1:1.534, 成本2:6.​​3445 } ]

我能够以字符串格式生成带有 cost1 和 cost2 的 json。但我希望 cost1 和 cost2 加倍。 使用定义的模式从 rdd 创建数据框时出现错误。 不知何故,数据被视为字符串而不是双精度。 有人可以帮我解决这个问题吗? 下面是我的示例实现的 scala 代码

object csv2json {
  def f[T](v: T) = v match {
  case _: Int    => "Int"
  case _: String => "String"
  case _: Float => "Float"
  case _: Double => "Double"
  case _:BigDecimal => "BigDecimal"
  case _         => "Unknown"
  }

  def main(args: Array[String]): Unit = {
    import org.apache.spark.sql.SparkSession
    val spark = SparkSession.builder().master("local").getOrCreate()
    import spark.implicits._

    val input_df = Seq(("12345", "111","1.34","2.34"),("123456", "112","1.343","2.344"),("1234", "113","1.353","2.354"),("1231", "114","5.343","6.344")).toDF("item_id","loc","cost1","cost2")
    input_df.show()  
    val inputRDD =  input_df.rdd.map(data => {

        val  nodeObj  = scala.collection.immutable.Map("nodeId" -> data(1).toString()
        ,"soc" -> data(2).toString().toDouble
        ,"mdc" -> data(3).toString().toDouble)
        (data(0).toString(),nodeObj)
      })

      val inputRDDAgg = inputRDD.aggregateByKey(scala.collection.mutable.ListBuffer.empty[Any])((nodeAAggreg,costValue) => nodeAAggreg += costValue , (nodeAAggreg,costValue) => nodeAAggreg ++ costValue)

      val inputRDDAggRow = inputRDDAgg.map(data => {
        println(data._1 + "and------ " + f(data._1))
        println(data._2 + "and------ " + f(data._2))

        val  skuObj  = Row(
         data._1,
         data._2)
        skuObj
      }
      )

      val innerSchema =  ArrayType(MapType(StringType, DoubleType, true))
      val schema:StructType = StructType(Seq(StructField(name="skuId", dataType=StringType),StructField(name="nodes", innerSchema)))
      val finalJsonDF = spark.createDataFrame(inputRDDAggRow, schema)
      finalJsonDF.show()
  }
}

下面是异常堆栈跟踪:

java.lang.RuntimeException: Error while encoding: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Double
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, skuId), StringType), true, false) AS skuId#32
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else mapobjects(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class java.lang.Object), if (isnull(validateexternaltype(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class java.lang.Object), true), MapType(StringType,DoubleType,true)))) null else newInstance(class org.apache.spark.sql.catalyst.util.ArrayBasedMapData), validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, nodes), ArrayType(MapType(StringType,DoubleType,true),true)), None) AS nodes#33
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:291)
    at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:589)
    at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:589)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)

【问题讨论】:

    标签: scala apache-spark dataframe row rdd


    【解决方案1】:

    我建议您通过使用 inbult 函数来保留数据集或数据框,因为它们是 rdds 的优化版本。

    因此您可以执行以下操作来满足您的要求

    import org.apache.spark.sql.functions._
    val finalJsonDF = input_df
      .groupBy("item_id")
      .agg(
        collect_list(
          struct(col("loc"), col("cost1").cast("double"), col("cost2").cast("double")))
          .as("jsonData"))
    

    collect_liststruct 是内置函数

    这应该给你

    +-------+-------------------+
    |item_id|jsonData           |
    +-------+-------------------+
    |123456 |[[112,1.343,2.344]]|
    |1234   |[[113,1.353,2.354]]|
    |1231   |[[114,5.343,6.344]]|
    |12345  |[[111,1.34,2.34]]  |
    +-------+-------------------+
    

    并根据需要将 jsonData 保存到 json 文件中

    finalJsonDF.coalesce(1).write.json("path to output file")
    

    应该给你

    {"item_id":"123456","jsonData":[{"loc":"112","col2":1.343,"col3":2.344}]}
    {"item_id":"1234","jsonData":[{"loc":"113","col2":1.353,"col3":2.354}]}
    {"item_id":"1231","jsonData":[{"loc":"114","col2":5.343,"col3":6.344}]}
    {"item_id":"12345","jsonData":[{"loc":"111","col2":1.34,"col3":2.34}]}
    

    【讨论】:

      【解决方案2】:

      我在您的代码中发现架构不匹配。我做了简单的修复作为一种解决方法

      我将data(1).toString 转换为data(1).toString.toDouble。在您的ArrayType(MapType(StringType, DoubleType, true)) 中,您提到所有值都是Double,其中您的值之一是String。我相信这就是问题所在。

      val inputRDD =  input_df.rdd.map(data => {
      
            val  nodeObj  = scala.collection.immutable.Map("nodeId" -> data(1).toString.toDouble
              ,"soc" -> data(2).toString().toDouble
              ,"mdc" -> data(3).toString().toDouble)
            (data(0).toString(),nodeObj)
          })
      

      输出

      +------+--------------------------------------------------+
      |skuId |nodes                                             |
      +------+--------------------------------------------------+
      |1231  |[Map(nodeId -> 114.0, soc -> 5.343, mdc -> 6.344)]|
      |12345 |[Map(nodeId -> 111.0, soc -> 1.34, mdc -> 2.34)]  |
      |123456|[Map(nodeId -> 112.0, soc -> 1.343, mdc -> 2.344)]|
      |1234  |[Map(nodeId -> 113.0, soc -> 1.353, mdc -> 2.354)]|
      +------+--------------------------------------------------+
      

      希望这会有所帮助!

      【讨论】:

      • 通过此更改,nodeID 更改为 double。但是在我的 json 中,我希望 nodeId 是 String 并且只有 cost 必须是两倍。因此,在模式中,我给出了 StringType 和 DoubleType。如果我希望 nodeId 是字符串,您能否建议对架构定义进行任何更改?
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2022-09-27
      • 2017-06-13
      • 2019-08-09
      • 1970-01-01
      • 2016-06-20
      • 2018-06-16
      • 2019-08-07
      相关资源
      最近更新 更多