【问题标题】:How to Convert Many CSV files to Parquet using AWS Glue如何使用 AWS Glue 将许多 CSV 文件转换为 Parquet
【发布时间】:2018-10-03 19:16:46
【问题描述】:

我正在使用 AWS S3、Glue 和 Athena,设置如下:

S3 --> 胶水 --> 雅典娜

我的原始数据以 CSV 文件的形式存储在 S3 上。我正在使用 Glue 进行 ETL,并且我正在使用 Athena 来查询数据。

由于我使用的是 Athena,因此我想将 CSV 文件转换为 Parquet。我现在正在使用 AWS Glue 来执行此操作。这是我正在使用的当前流程:

  1. 运行 Crawler 以读取 CSV 文件并填充数据目录。
  2. 运行 ETL 作业以从数据目录创建 Parquet 文件。
  3. 运行爬虫以使用 Parquet 文件填充数据目录。

Glue 作业一次只允许我转换一个表。如果我有很多 CSV 文件,这个过程很快就会变得难以管理。有没有更好的方法,也许是“正确”的方法,使用 AWS Glue 或其他一些 AWS 服务将 许多 CSV 文件转换为 Parquet?

【问题讨论】:

    标签: amazon-s3 parquet amazon-athena aws-glue


    【解决方案1】:

    我遇到了完全相同的情况,我想有效地循环遍历由爬虫编目的目录表,这些目录表指向 csv 文件,然后将它们转换为镶木地板。不幸的是,网络上还没有太多可用的信息。这就是为什么我在LinkedIn 中写了一篇博客来解释我是如何做到的。请阅读;特别是第 5 点。希望有帮助。请让我知道您的反馈。

    注意:根据 Antti 的反馈,我从下面的博客中粘贴了摘录的解决方案:

    1. 遍历目录/数据库/表

    作业向导带有在数据源上运行预定义脚本的选项。问题是您可以选择的数据源是目录中的单个表。它没有让您选择在整个数据库或一组表上运行作业。无论如何,您可以稍后修改脚本,但是在胶水目录中迭代数据库表的方法也很难找到。有目录 API,但缺少合适的示例。 github 示例 repo 可以丰富更多场景以帮助开发人员。

    经过一番折腾,我想出了下面的脚本来完成这项工作。我使用 boto3 客户端循环遍历表。如果涉及到某人的帮助,我将其粘贴在这里。如果您有更好的建议,我也想听听您的意见

    import sys
    import boto3
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
    
    ## @params: [JOB_NAME]
    args = getResolvedOptions(sys.argv, ['JOB_NAME'])
    
    sc = SparkContext()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    job = Job(glueContext)
    job.init(args['JOB_NAME'], args)
    
    
    client = boto3.client('glue', region_name='ap-southeast-2')
    
    databaseName = 'tpc-ds-csv'
    print '\ndatabaseName: ' + databaseName
    
    Tables = client.get_tables(DatabaseName=databaseName)
    
    tableList = Tables['TableList']
    
    for table in tableList:
        tableName = table['Name']
        print '\n-- tableName: ' + tableName
    
        datasource0 = glueContext.create_dynamic_frame.from_catalog(
            database="tpc-ds-csv", 
            table_name=tableName, 
            transformation_ctx="datasource0"
        )
    
        datasink4 = glueContext.write_dynamic_frame.from_options(
            frame=datasource0,
            connection_type="s3", 
            connection_options={
                "path": "s3://aws-glue-tpcds-parquet/"+ tableName + "/"
                },
            format="parquet",
            transformation_ctx="datasink4"
        )
    job.commit()
    

    【讨论】:

    • 我很久以前就解决了这个问题。您链接的博客文章中提到的解决方案与我最终所做的几乎相同。我希望 AWS 能够及时更新他们的 Glue 文档。目前严重缺乏。
    • 这不是一个答案,除非您实际上至少提供了答案本身的细节的一瞥。
    • 你说得对,安蒂。那时我是新的贡献者,还在学习。我已经用实际的解决方案编辑了答案
    【解决方案2】:

    请参阅编辑以获取更新信息。

    S3 --> 雅典娜

    为什么不直接在 Athena 中使用 CSV 格式?

    https://docs.aws.amazon.com/athena/latest/ug/supported-format.html

    CSV 是受支持的格式之一。此外,为了提高效率,您可以压缩多个 CSV 文件以加快加载速度。

    支持的压缩,

    https://docs.aws.amazon.com/athena/latest/ug/compression-formats.html

    希望对你有帮助。

    编辑:

    为什么 Parquet 格式比 CSV 更有用?

    https://dzone.com/articles/how-to-be-a-hero-with-powerful-parquet-google-and

    S3 --> 胶水 --> 雅典娜

    有关 CSV 到 Parquet 转换的更多详细信息,

    https://aws.amazon.com/blogs/big-data/build-a-data-lake-foundation-with-aws-glue-and-amazon-s3/

    【讨论】:

    • 在使用 Athena 时,我正在使用 Parquet 来提高查询性能并降低查询成本。
    • 感谢您的洞察力。有时问题比答案更能提供信息。
    • 您提供的最后一个链接描述了我当前的过程——将单个表的数据转换为 Parquet。我正在为许多表寻找“最佳实践”或“易于管理”的方法。
    【解决方案3】:

    我不是 Glue 的忠实粉丝,也不是从数据创建模式

    这是在 Athena 中的操作方法,它比 Glue 快得多。

    这是用于 CSV 文件的:

    create table foo (
      id int,
      name string,
      some date
    )
    row format delimited
      fields terminated by ','
    location 's3://mybucket/path/to/csvs/'
    

    这是用于镶木地板文件的:

    create table bar 
    with (
      external_location = 's3://mybucket/path/to/parquet/',
      format = 'PARQUET'
    )
    as select * from foo 
    

    您不需要为镶木地板创建该路径,即使您使用分区

    【讨论】:

      【解决方案4】:

      您可以直接将 JSON 或 CSV 文件转换为 parquet,而无需先将其导入目录。

      这是用于 JSON 文件的 - 下面的代码将转换托管在 rawFiles 目录中的任何内容

      import sys
      from awsglue.job import Job 
      from awsglue.transforms import *
      from awsglue.context import GlueContext
      from pyspark.context import SparkContext
      from awsglue.utils import getResolvedOptions
      
      ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME'])
      
      sparkContext = SparkContext()
      glueContext = GlueContext(sparkContext)
      spark = glueContext.spark_session
      job = Job(glueContext) job.init(args['JOB_NAME'], args)
      
      s3_json_path = 's3://rawFiles/'  
      s3_parquet_path = 's3://convertedFiles/'
      
      output = spark.read.load(s3_json_path, format='json') 
      output.write.parquet(s3_parquet_path)
      
      job.commit()
      

      【讨论】:

        【解决方案5】:

        听起来在您的第 1 步中,您正在抓取单个 csv 文件(例如 some-bucket/container-path/file.csv),但如果您将抓取工具设置为查看路径级别而不是文件级别(例如 some-bucket/container-path/) 并且您的所有 csv 文件都是统一的,那么爬虫应该只创建一个外部表而不是每个文件的外部表,您将能够从所有文件中提取数据一次。

        【讨论】:

        • 第 1 步中的爬虫设置为爬取文件夹路径。这将创建一个具有多个表的数据库。每个表的数据都存储为 CSV 文件。我正在尝试使用单个脚本或作业将所有这些 CSV 文件转换为 Parquet。换句话说,我想将给定数据库的所有 CSV 文件转换为 Parquet。 AWS Glue 只允许我为每个作业选择一个表。我正在寻找一种方法来有效地为多个表执行此操作。
        • @mark - stackoverflow.com/users/5504459/mark-s,我正在努力实现同样的目标。你有一个可行的解决方案吗?
        • @nitinr708 我的解决方案可能已经过时(例如,pandas 现在应该可用于 Python shell Glue 作业)。基本方法是遍历所有 csv 文件,将每个文件读入数据帧,然后写入 parquet。 Pandas DF、Glue DynamicFrames 和 PySpark DF 是您的选择。每个都有不同的 API 用于读取/写入 DF。这些链接应该会有所帮助——胶水:docs.aws.amazon.com/glue/latest/dg/…。 PySpark:stackoverflow.com/a/45873742/5504459。熊猫:stackoverflow.com/a/37703861/5504459
        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2018-07-22
        • 1970-01-01
        • 2018-04-04
        • 2020-09-22
        • 1970-01-01
        • 2018-12-15
        相关资源
        最近更新 更多