【问题标题】:Infer Schema from rdd to Dataframe in Spark Scala在 Spark Scala 中从 rdd 推断模式到 Dataframe
【发布时间】:2020-07-24 15:57:30
【问题描述】:

这个问题是来自(Spark - creating schema programmatically with different data types)的参考

我正在尝试从 rdd 推断模式到 Dataframe ,下面是我的代码

 def inferType(field: String) = field.split(":")(1) match {
    case "Integer" => IntegerType
    case "Double" => DoubleType
    case "String" => StringType
    case "Timestamp" => TimestampType
    case "Date" => DateType
    case "Long" => LongType
    case _ => StringType
 }


val header = c1:String|c2:String|c3:Double|c4:Integer|c5:String|c6:Timestamp|c7:Long|c8:Date

val df1 = Seq(("a|b|44.44|5|c|2018-01-01 01:00:00|456|2018-01-01")).toDF("data")
val rdd1 = df1.rdd.map(x => Row(x.getString(0).split("\\|"): _*))

val schema = StructType(header.split("\\|").map(column => StructField(column.split(":")(0), inferType(column), true)))
val df = spark.createDataFrame(rdd1, schema)
df.show()

当我表演时,它会抛出以下错误。我必须对更大规模的数据执行此操作并且无法找到正确的解决方案,请您帮我找到解决方案或任何其他方式,我可以实现这一点。

java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.lang.String is not a valid external type for schema of int

提前致谢

【问题讨论】:

    标签: scala dataframe apache-spark apache-spark-sql


    【解决方案1】:

    简答:不能使用自定义类型/格式指定字符串/文本。

    您要做的是将字符串解析为 sql 列。与其他示例的不同之处在于,您尝试从 csv 加载。 工作版本可以这样实现:

    // skipped other details such as schematype, spark session...
    
    val header = "c1:String|c2:String|c3:Double|c4:Integer"
    
    // Create `Row` from `Seq`
    val row = Row.fromSeq(Seq("a|b|44.44|12|"))
    
    // Create `RDD` from `Row`
    val rdd: RDD[Row] = spark.sparkContext
      .makeRDD(List(row))
      .map { row =>
        row.getString(0).split("\\|") match {
          case Array(col1, col2, col3, col4) =>
            Row.fromTuple(col1, col2, col3.toDouble, col4.toInt)
        }
      }
    val stt: StructType = StructType(
      header
        .split("\\|")
        .map(column => StructField(column, inferType(column), true))
    )
    
    val dataFrame = spark.createDataFrame(rdd, stt)
    dataFrame.show()
    
    

    从 Scala 类型创建 Row 的原因是在此处引入兼容类型或 Row 受尊重的类型。
    注意我跳过了与日期和时间相关的字段,日期转换很棘手。您可以查看我的另一个答案如何使用格式化日期和时间戳here

    【讨论】:

    • 非常感谢 Bob,感谢您的努力,这里唯一的问题是,我们需要保留 .toDouble 和 .toInt。我正在查看它是否动态转换数据类型,因为我还有其他数据集以及时间戳。如果没有更好的解决方案,我会接受。再次感谢
    • 看这个例子,如果你从 json 读取,csv 那么你不需要担心类型推断:stackoverflow.com/questions/61147303/…
    • 它不仅仅是 csv 和 json bob,我们有 10k 列分为多个集合,我正在寻找一些动态类型推断,谢谢
    猜你喜欢
    • 1970-01-01
    • 2022-11-02
    • 1970-01-01
    • 2017-06-13
    • 2016-03-06
    • 1970-01-01
    • 1970-01-01
    • 2022-09-27
    • 2017-02-03
    相关资源
    最近更新 更多