【问题标题】:Converting Pandas dataframe into Spark dataframe error将 Pandas 数据帧转换为 Spark 数据帧错误
【发布时间】:2016-09-27 13:21:58
【问题描述】:

我正在尝试将 Pandas DF 转换为 Spark 之一。 DF头:

10000001,1,0,1,12:35,OK,10002,1,0,9,f,NA,24,24,0,3,9,0,0,1,1,0,0,4,543
10000001,2,0,1,12:36,OK,10002,1,0,9,f,NA,24,24,0,3,9,2,1,1,3,1,3,2,611
10000002,1,0,4,12:19,PA,10003,1,1,7,f,NA,74,74,0,2,15,2,0,2,3,1,2,2,691

代码:

dataset = pd.read_csv("data/AS/test_v2.csv")
sc = SparkContext(conf=conf)
sqlCtx = SQLContext(sc)
sdf = sqlCtx.createDataFrame(dataset)

我得到一个错误:

TypeError: Can not merge type <class 'pyspark.sql.types.StringType'> and <class 'pyspark.sql.types.DoubleType'>

【问题讨论】:

  • 我的第一个假设是该文件在一列中同时包含数字和字符串,Spark 对此感到困惑。但是,导入时应该由 Pandas 处理。
  • 你的 DF 有列名吗?
  • 是的。我应该禁用它们吗?
  • 否,但如果您将它放到 DF 头输出中会很有帮助。尝试跳过第 11-n 列(带有 NA)并重新运行您的代码
  • 你为什么不用spark-csv

标签: python pandas apache-spark spark-dataframe


【解决方案1】:

我已经用你的数据试过了,它正在工作:

%pyspark
import pandas as pd
from pyspark.sql import SQLContext
print sc
df = pd.read_csv("test.csv")
print type(df)
print df
sqlCtx = SQLContext(sc)
sqlCtx.createDataFrame(df).show()

【讨论】:

  • 对于我的数据来说,它需要永远
【解决方案2】:

您需要确保您的 pandas 数据框列适合 spark 推断的类型。如果您的 pandas 数据框列出以下内容:

pd.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5062 entries, 0 to 5061
Data columns (total 51 columns):
SomeCol                    5062 non-null object
Col2                       5062 non-null object

你会得到那个错误尝试:

df[['SomeCol', 'Col2']] = df[['SomeCol', 'Col2']].astype(str)

现在,确保 .astype(str) 实际上是您希望这些列成为的类型。基本上,当底层 Java 代码尝试从 python 中的对象推断类型时,它会使用一些观察结果并进行猜测,如果该猜测不适用于列中的所有数据,则它试图从 pandas 转换为spark它会失败。

【讨论】:

  • 我发现这很有帮助。后续问题:当我为自己的数据框完成并按照这些步骤操作时,我没有看到 pd.info() 有任何变化。数据框本身究竟是如何变化的?使用 .astype(str) 后如何查看 pandas DataFrame 的变化?
【解决方案3】:

可以通过强加架构来避免与类型相关的错误,如下所示:

注意:使用原始数据(如上)创建了一个文本文件 (test.csv),并插入了假设的列名 ("col1","col2 ",...,"col25")。

import pyspark
from pyspark.sql import SparkSession
import pandas as pd

spark = SparkSession.builder.appName('pandasToSparkDF').getOrCreate()

pdDF = pd.read_csv("test.csv")

pandas 数据框的内容:

       col1     col2    col3    col4    col5    col6    col7    col8   ... 
0      10000001 1       0       1       12:35   OK      10002   1      ...
1      10000001 2       0       1       12:36   OK      10002   1      ...
2      10000002 1       0       4       12:19   PA      10003   1      ...

接下来,创建架构:

from pyspark.sql.types import *

mySchema = StructType([ StructField("col1", LongType(), True)\
                       ,StructField("col2", IntegerType(), True)\
                       ,StructField("col3", IntegerType(), True)\
                       ,StructField("col4", IntegerType(), True)\
                       ,StructField("col5", StringType(), True)\
                       ,StructField("col6", StringType(), True)\
                       ,StructField("col7", IntegerType(), True)\
                       ,StructField("col8", IntegerType(), True)\
                       ,StructField("col9", IntegerType(), True)\
                       ,StructField("col10", IntegerType(), True)\
                       ,StructField("col11", StringType(), True)\
                       ,StructField("col12", StringType(), True)\
                       ,StructField("col13", IntegerType(), True)\
                       ,StructField("col14", IntegerType(), True)\
                       ,StructField("col15", IntegerType(), True)\
                       ,StructField("col16", IntegerType(), True)\
                       ,StructField("col17", IntegerType(), True)\
                       ,StructField("col18", IntegerType(), True)\
                       ,StructField("col19", IntegerType(), True)\
                       ,StructField("col20", IntegerType(), True)\
                       ,StructField("col21", IntegerType(), True)\
                       ,StructField("col22", IntegerType(), True)\
                       ,StructField("col23", IntegerType(), True)\
                       ,StructField("col24", IntegerType(), True)\
                       ,StructField("col25", IntegerType(), True)])

注意True(暗示允许为空)

创建 pyspark 数据框:

df = spark.createDataFrame(pdDF,schema=mySchema)

确认 pandas 数据框现在是 pyspark 数据框:

type(df)

输出:

pyspark.sql.dataframe.DataFrame

旁白

要解决 Kate 在下面的评论 - 强加一个通用(字符串)模式,您可以执行以下操作:

df=spark.createDataFrame(pdDF.astype(str)) 

【讨论】:

  • 是否可以将模式创建部分概括为仅将所有列创建为某种类型?例如,只需告诉它所有列为 StringType(而不是单独分配每一列)
  • df=spark.createDataFrame(pdPD.astype(str))
  • 嗨,格兰特,在您创建“mySchema”的步骤中,您必须输入所有内容吗?有没有办法从熊猫数据框的示例中提取模式?谢谢。
  • 是的 - 必须全部输入(复制并粘贴并在必要时进行更改)。我发现试图让 spark 数据框从 pandas 数据框推断模式(如上面的原始问题)风险太大。我的看法是,强制/实施正确的模式是风险最低的策略。如果您最初不能强加所需的模式,那么快速而肮脏的方法是对所有内容强加一个字符串模式(如上所示)并在稍后阶段更正类型。
【解决方案4】:

我曾经收到过类似的错误消息,就我而言,这是因为我的 pandas 数据框包含 NULL。我会建议在转换为 spark 之前尝试在 pandas 中处理此问题(这解决了我的问题)。

【讨论】:

    【解决方案5】:

    我制作了这个脚本,它适用于我的 10 个熊猫数据框

    from pyspark.sql.types import *
    
    # Auxiliar functions
    def equivalent_type(f):
        if f == 'datetime64[ns]': return TimestampType()
        elif f == 'int64': return LongType()
        elif f == 'int32': return IntegerType()
        elif f == 'float64': return FloatType()
        else: return StringType()
    
    def define_structure(string, format_type):
        try: typo = equivalent_type(format_type)
        except: typo = StringType()
        return StructField(string, typo)
    
    # Given pandas dataframe, it will return a spark's dataframe.
    def pandas_to_spark(pandas_df):
        columns = list(pandas_df.columns)
        types = list(pandas_df.dtypes)
        struct_list = []
        for column, typo in zip(columns, types): 
          struct_list.append(define_structure(column, typo))
        p_schema = StructType(struct_list)
        return sqlContext.createDataFrame(pandas_df, p_schema)
    

    你也可以在这个gist看到它

    有了这个你只需要打电话给spark_df = pandas_to_spark(pandas_df)

    【讨论】:

    • 验证了这一切,还验证了输出从 pyspark 到 parquet 再到 scala。谢谢贡萨洛。不会开始知道如何,但这似乎是对开源社区的杰出贡献。可能像 pd.to_sparkdf() 什么的。
    • Gonzalo,我只是为了支持 ArrayType[StringType] 而分叉了你的要点。再次感谢。读者们,这是从 pandas 到 pyspark 和 scala spark 的绝佳解决方案。
    • 这个解决方案太棒了!感谢您分享它,它为我节省了大量时间来进行此转换而无需进行大量调整,并且非常适合转换为临时表。
    • 警告:如果您尝试转换由日期和时间组成的日期时间对象(如pd.to_datetime('2020-01-01 13:45:12')),则时间信息会因您的方法而丢失。为了解决这个问题,请将DateType() 更改为TimestampType()
    • 这应该是一个公认的答案!谢谢贡萨洛!
    【解决方案6】:

    在 spark version >= 3 中,您可以在一行中将 pandas 数据帧转换为 pyspark 数据帧

    使用 spark.createDataFrame(pandasDF)

    dataset = pd.read_csv("data/AS/test_v2.csv")
    
    sparkDf = spark.createDataFrame(dataset);
    

    如果您对 spark 会话变量感到困惑, spark会话如下

    sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))
    
    spark = SparkSession \
        .builder \
        .getOrCreate()
    

    【讨论】:

    • 谢谢!我花了很多时间在 pandas 和 spark 之间构建一个转换器,甚至为它创建了一个 github repo。这确实很容易,至少对于简单的数据类型。
    猜你喜欢
    • 2020-07-24
    • 1970-01-01
    • 2016-01-03
    • 2015-09-08
    • 1970-01-01
    • 1970-01-01
    • 2021-04-19
    • 1970-01-01
    • 2017-02-04
    相关资源
    最近更新 更多