【问题标题】:Create Spark DataFrame. Can not infer schema for type: <type 'float'>创建 Spark DataFrame。无法推断类型的架构:<type 'float'>
【发布时间】:2015-12-20 21:57:08
【问题描述】:

有人可以帮我解决 Spark DataFrame 遇到的这个问题吗?

当我执行myFloatRDD.toDF() 时出现错误:

TypeError: 无法推断类型的架构:type 'float'

我不明白为什么......

例子:

myFloatRdd = sc.parallelize([1.0,2.0,3.0])
df = myFloatRdd.toDF()

谢谢

【问题讨论】:

    标签: python apache-spark dataframe pyspark apache-spark-sql


    【解决方案1】:

    SparkSession.createDataFrame,在后台使用,需要 Row/tuple/list/dict* 或 @987654328 的 RDD / list @,除非提供了带有 DataType 的架构。尝试像这样将浮点数转换为元组:

    myFloatRdd.map(lambda x: (x, )).toDF()
    

    甚至更好:

    from pyspark.sql import Row
    
    row = Row("val") # Or some other column name
    myFloatRdd.map(row).toDF()
    

    要从标量列表创建DataFrame,您必须直接使用SparkSession.createDataFrame 并提供架构***:

    from pyspark.sql.types import FloatType
    
    df = spark.createDataFrame([1.0, 2.0, 3.0], FloatType())
    
    df.show()
    
    ## +-----+
    ## |value|
    ## +-----+
    ## |  1.0|
    ## |  2.0|
    ## |  3.0|
    ## +-----+
    

    但对于一个简单的范围,最好使用SparkSession.range

    from pyspark.sql.functions import col
    
    spark.range(1, 4).select(col("id").cast("double"))
    

    * 不再支持。

    ** Spark SQL 还对暴露 __dict__ 的 Python 对象的模式推断提供有限支持。

    *** 仅在 Spark 2.0 或更高版本中受支持。

    【讨论】:

    • 我是新手。你能解释一下myFloatRdd.map(lambda x: (x, )).toDF()是如何解决这个问题的吗? map(lambda x: (x,)) 是否只是将 RDD 对象转换为行列表?
    • @kasa tuples (-> struct) 有推理映射,没有标量。
    • 使用第一个选项,可以在同一行提供 col 名称:rdd.map(lambda x: (x, )).toDF(['colName'])
    【解决方案2】:
    from pyspark.sql.types import IntegerType, Row
    
    mylist = [1, 2, 3, 4, None ]
    l = map(lambda x : Row(x), mylist)
    # notice the parens after the type name
    df=spark.createDataFrame(l,["id"])
    df.where(df.id.isNull() == False).show()
    

    基本上,你需要将你的 int 初始化为 Row(),然后我们就可以使用 schema

    【讨论】:

      【解决方案3】:
      使用反射推断架构
      from pyspark.sql import Row
      # spark - sparkSession
      sc = spark.sparkContext
      
      # Load a text file and convert each line to a Row.
      orders = sc.textFile("/practicedata/orders")
      #Split on delimiters
      parts = orders.map(lambda l: l.split(","))
      #Convert to Row
      orders_struct = parts.map(lambda p: Row(order_id=int(p[0]), order_date=p[1], customer_id=p[2], order_status=p[3]))
      for i in orders_struct.take(5): print(i)
      #convert the RDD to DataFrame
      
      orders_df = spark.createDataFrame(orders_struct)
      
      以编程方式指定架构
      from pyspark.sql import Row
      # spark - sparkSession
      sc = spark.sparkContext
      
      # Load a text file and convert each line to a Row.
      orders = sc.textFile("/practicedata/orders")
      #Split on delimiters
      parts = orders.map(lambda l: l.split(","))
      #Convert to tuple
      orders_struct = parts.map(lambda p: (p[0], p[1], p[2], p[3].strip()))
      
      #convert the RDD to DataFrame
      
      orders_df = spark.createDataFrame(orders_struct)
      
      # The schema is encoded in a string.
      schemaString = "order_id order_date customer_id status"
      
      fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
      schema = Struct
      
      ordersDf = spark.createDataFrame(orders_struct, schema)
      

      类型(字段)

      【讨论】:

      • 嗨!欢迎来到 StackOverflow。如果您认为您对已接受的answer, 有什么要补充的,请明确说明,并避免添加无法解释的 sn-ps 代码。
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2016-08-03
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-05-30
      • 2017-11-27
      相关资源
      最近更新 更多