【问题标题】:Problem loading csv into DataFrame in PySpark在 PySpark 中将 csv 加载到 DataFrame 时出现问题
【发布时间】:2019-02-22 00:25:15
【问题描述】:

我正在尝试使用 AWS Glue 中的 ETL 作业将一堆 CSV 文件聚合为一个,并以 ORC 格式将其输出到 S3。我的聚合 CSV 如下所示:

header1,header2,header3
foo1,foo2,foo3
bar1,bar2,bar3

我有一个名为aggregated_csv 的聚合CSV 的字符串表示形式,其内容为header1,header2,header3\nfoo1,foo2,foo3\nbar1,bar2,bar3。 我读过 pyspark 有一种将 CSV 文件转换为 DataFrames 的简单方法(我需要这样才能利用 Glue 在 ORC 中轻松输出的能力)。这是我尝试过的sn-p:

def f(glueContext, aggregated_csv, schema):
    with open('somefile', 'a+') as agg_file:
        agg_file.write(aggregated_csv)
        #agg_file.seek(0)
        df = glueContext.read.csv(agg_file, schema=schema, header="true")
        df.show()

无论有无搜索,我都试过了。当我不调用 seek() 时,作业成功完成,但 df.show() 不显示除标题以外的任何数据。当我调用 seek() 时,我得到以下异常:

pyspark.sql.utils.AnalysisException: u'Path does not exist: hdfs://ip-172-31-48-255.us-west-2.compute.internal:8020/user/root/header1,header2,header3\n;'

由于 seek 似乎改变了行为,并且由于我的 csv 中的标头是异常字符串的一部分,我假设问题与我将文件传递给 glueContext.read.csv() 时文件光标所在的位置有关,但是我不知道如何解决它。如果我取消注释seek(0) 调用并添加agg_file.read() 命令,我可以按预期看到文件的全部内容。我需要更改哪些内容才能成功读取刚刚写入 spark 数据帧的 csv 文件?

【问题讨论】:

    标签: python csv dataframe pyspark aws-glue


    【解决方案1】:

    我认为您将错误的参数传递给 csv 函数。我相信GlueContext.read.csv() 将获得DataFrameReader.csv() 的一个实例,它的签名将文件名作为第一个参数,而您传递的是一个类似文件的对象。

    def f(glueContext, aggregated_csv, schema):
        with open('somefile', 'a+') as agg_file:
            agg_file.write(aggregated_csv)
            #agg_file.seek(0)
        df = glueContext.read.csv('somefile', schema=schema, header="true")
        df.show()
    

    但是,如果您希望它写入一个 ORC 文件,并且您已经将数据读取为aggregated_csv,您可以直接从元组列表中创建一个DataFrame

    df = spark.createDataFrame([('foo1','foo2','foo3'), ('bar1','bar2','bar3')], ['header1', 'header2', 'header3'])
    

    然后,如果您需要 Glue DynamicFrame 使用 fromDF 函数

    dynF = fromDF(df, glueContext, 'myFrame')
    

    还有一点:编写 ORC 不需要胶水——激发它完全可以做到。只需使用DataFrameWriter.orc() 函数:

    df.write.orc('s3://path')
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2018-04-25
      • 2015-10-18
      • 1970-01-01
      • 2020-10-21
      • 1970-01-01
      • 2016-06-07
      • 2021-01-20
      相关资源
      最近更新 更多