【问题标题】:Create DataFrame with null value for few column为几列创建具有空值的 DataFrame
【发布时间】:2017-01-20 17:16:33
【问题描述】:

我正在尝试使用RDD 创建一个DataFrame

首先我使用下面的代码创建一个RDD -

val account = sc.parallelize(Seq(
                                 (1, null, 2,"F"), 
                                 (2, 2, 4, "F"),
                                 (3, 3, 6, "N"),
                                 (4,null,8,"F")))

一切正常——

帐户:org.apache.spark.rdd.RDD[(Int, Any, Int, String)] = ParallelCollectionRDD[0] at parallelize at :27

但是当尝试使用下面的代码从RDD 创建DataFrame

account.toDF("ACCT_ID", "M_CD", "C_CD","IND")

我遇到错误

java.lang.UnsupportedOperationException:Any 类型的架构不是 支持

我分析,每当我将null 值放入Seq 时,只有我得到了错误。

有没有办法添加空值?

【问题讨论】:

  • 使用(1, null: Integer, 2,"F")

标签: scala apache-spark spark-dataframe apache-spark-dataset


【解决方案1】:

不使用 RDD 的替代方法:

import spark.implicits._

val df = spark.createDataFrame(Seq(
  (1, None,    2, "F"),
  (2, Some(2), 4, "F"),
  (3, Some(3), 6, "N"),
  (4, None,    8, "F")
)).toDF("ACCT_ID", "M_CD", "C_CD","IND")

df.show
+-------+----+----+---+
|ACCT_ID|M_CD|C_CD|IND|
+-------+----+----+---+
|      1|null|   2|  F|
|      2|   2|   4|  F|
|      3|   3|   6|  N|
|      4|null|   8|  F|
+-------+----+----+---+

df.printSchema
root
 |-- ACCT_ID: integer (nullable = false)
 |-- M_CD: integer (nullable = true)
 |-- C_CD: integer (nullable = false)
 |-- IND: string (nullable = true)

【讨论】:

    【解决方案2】:

    问题是 Any 类型太笼统,Spark 不知道如何序列化它。您应该明确提供一些特定类型,在您的情况下为Integer。由于 null 不能分配给 Scala 中的原始类型,因此您可以使用 java.lang.Integer 代替。所以试试这个:

    val account = sc.parallelize(Seq(
                                     (1, null.asInstanceOf[Integer], 2,"F"), 
                                     (2, new Integer(2), 4, "F"),
                                     (3, new Integer(3), 6, "N"),
                                     (4, null.asInstanceOf[Integer],8,"F")))
    

    这是一个输出:

    rdd: org.apache.spark.rdd.RDD[(Int, Integer, Int, String)] = ParallelCollectionRDD[0] at parallelize at <console>:24
    

    以及对应的DataFrame:

    scala> val df = rdd.toDF("ACCT_ID", "M_CD", "C_CD","IND")
    
    df: org.apache.spark.sql.DataFrame = [ACCT_ID: int, M_CD: int ... 2 more fields]
    
    scala> df.show
    +-------+----+----+---+
    |ACCT_ID|M_CD|C_CD|IND|
    +-------+----+----+---+
    |      1|null|   2|  F|
    |      2|   2|   4|  F|
    |      3|   3|   6|  N|
    |      4|null|   8|  F|
    +-------+----+----+---+
    

    您还可以考虑一些更简洁的方法来声明空整数值,例如:

    object Constants {
      val NullInteger: java.lang.Integer = null
    }
    

    【讨论】:

    • 如果我使用case class 创建DataFrame,我应该如何处理它,即,我使用spark.sparkContext.parallellize(Seq(A(_, _), A(_, _))).toDF() 创建DataFrame,而我有case class A(_, _)?我已经尝试过上述技术,但null.asInstanceOf[T] 给了我NullPointerExceptionnull: T(如对问题的评论中所述)给了我an expression of type Null is ineligible for implicit conversion
    猜你喜欢
    • 2018-06-11
    • 2015-11-07
    • 2017-11-14
    • 1970-01-01
    • 2016-12-18
    • 1970-01-01
    • 1970-01-01
    • 2015-10-07
    相关资源
    最近更新 更多