【问题标题】:How to convert a csv file to parquet如何将 csv 文件转换为镶木地板
【发布时间】:2014-11-25 07:02:15
【问题描述】:

我是 BigData 的新手。我需要将 csv/txt 文件转换为 Parquet 格式。我搜索了很多,但找不到任何直接的方法。有什么方法可以实现吗?

【问题讨论】:

    标签: java parquet


    【解决方案1】:

    我已经在an answer 上发布了关于如何使用 Apache Drill 来做到这一点。但是,如果您熟悉 Python,您现在可以使用 PandasPyArrow

    安装依赖项

    使用pip

    pip install pandas pyarrow
    

    或使用conda:

    conda install pandas pyarrow -c conda-forge
    

    将 CSV 分块转换为 Parquet

    # csv_to_parquet.py
    
    import pandas as pd
    import pyarrow as pa
    import pyarrow.parquet as pq
    
    csv_file = '/path/to/my.tsv'
    parquet_file = '/path/to/my.parquet'
    chunksize = 100_000
    
    csv_stream = pd.read_csv(csv_file, sep='\t', chunksize=chunksize, low_memory=False)
    
    for i, chunk in enumerate(csv_stream):
        print("Chunk", i)
        if i == 0:
            # Guess the schema of the CSV file from the first chunk
            parquet_schema = pa.Table.from_pandas(df=chunk).schema
            # Open a Parquet file for writing
            parquet_writer = pq.ParquetWriter(parquet_file, parquet_schema, compression='snappy')
        # Write CSV chunk to the parquet file
        table = pa.Table.from_pandas(chunk, schema=parquet_schema)
        parquet_writer.write_table(table)
    
    parquet_writer.close()
    

    我没有将此代码与 Apache Drill 版本进行基准测试,但根据我的经验,它非常快,每秒转换数万行(这当然取决于 CSV 文件!)。


    编辑:

    我们现在可以使用pyarrow.csv.read_csv 将 CSV 文件直接读取到 PyArrow 表中。这可能比使用 Pandas CSV 阅读器更快,尽管它可能不太灵活。

    【讨论】:

    • 为什么不那么灵活? (抱歉,我没有使用pyarrow 的经验,只是看到你的评论很好奇)
    • @sphoenix 我主要指的是pd.read_csvpyarrow.csv.read_csv 方法接受的参数数量。举个具体的例子,pd.read_csvsep="..."的情况可以是正则表达式,而pyarrow.csv.read_csvdelimiter="..."必须是单个字符。
    【解决方案2】:

    [对于 Python]

    Pandas 现在直接支持它。

    只需使用 read_csv 将 csv 文件读入 pandas 的数据帧,然后使用 to_parquet 将该数据帧写入 parquet 文件即可。

    【讨论】:

    • 为什么要为 Java 问题提供 Python 解决方案?
    • 因为已经有一个没有提到 to_parquet(因为它是在 0.21.0 中发布的)。认为这可能对需要基于 python 的解决方案的人有用。
    【解决方案3】:

    您可以使用Apache Drill,如Convert a CSV File to Apache Parquet With Drill 中所述。

    简而言之:

    开始 Apache Drill:

    $ cd /opt/drill/bin $ sqlline -u jdbc:drill:zk=local

    创建 Parquet 文件:

    -- 设置默认表格格式为 parquet ALTER SESSION SET `store.format`='parquet'; -- 创建一个 parquet 表,其中包含 CSV 表中的所有数据 创建表 dfs.tmp.`/stats/airport_data/` AS 选择 CAST(SUBSTR(columns[0],1,4) AS INT) `YEAR`, CAST(SUBSTR(columns[0],5,2) AS INT) `MONTH`, 列 [1] 作为“航空公司”, 列 [2] 作为“IATA_CODE”, 列[3] 为“AIRLINE_2”, 列 [4] 为“IATA_CODE_2”, 列 [5] 作为 `GEO_SUMMARY`, 列 [6] 为“GEO_REGION”, 列[7] 为“ACTIVITY_CODE”, 列 [8] 为“PRICE_CODE”, 列 [9] 作为“终端”, 列 [10] 为“BOARDING_AREA”, CAST(columns[11] AS DOUBLE) 作为`PASSENGER_COUNT` FROM dfs.`/opendata/Passenger/SFO_Passenger_Data/*.csv`;

    尝试从新 Parquet 文件中选择数据:

    -- 从 parquet 表中选择数据 选择 * FROM dfs.tmp.`/stats/airport_data/*`

    您可以通过转到http://localhost:8047/storage/dfs(来源:CSV and Parquet)来更改dfs.tmp 的位置。

    【讨论】:

    • 我确认这是实现这一目标的最佳和最简单的方法。 Apache Hive 也可以作为替代方案。
    【解决方案4】:

    以下代码是使用 spark2.0 的示例。读取比 inferSchema 选项快得多。 Spark 2.0 转换成 parquet 文件的效率比 spark1.6 高得多。

    import org.apache.spark.sql.types._
    var df = StructType(Array(StructField("timestamp", StringType, true),StructField("site", StringType, true),StructField("requests", LongType, true) ))
    df = spark.read
              .schema(df)
              .option("header", "true")
              .option("delimiter", "\t")
              .csv("/user/hduser/wikipedia/pageviews-by-second-tsv")
    df.write.parquet("/user/hduser/wikipedia/pageviews-by-second-parquet")
    

    【讨论】:

      【解决方案5】:

      1) 可以创建外部 hive 表

      create  external table emp(name string,job_title string,department string,salary_per_year int)
      row format delimited
      fields terminated by ','
      location '.. hdfs location of csv file '
      

      2) 另一个存储 parquet 文件的 hive 表

      create  external table emp_par(name string,job_title string,department string,salary_per_year int)
      row format delimited
      stored as PARQUET
      location 'hdfs location were you want the save parquet file'
      

      将表一数据插入表二:

      insert overwrite table emp_par select * from emp 
      

      【讨论】:

      • 表 emp_par 已创建为外部表。这应该已创建为普通表,否则您无法将数据插入其中。
      【解决方案6】:

      将 csv 文件读取为 Dataframe in Apache Sparkspark-csv package。将数据加载到 Dataframe 后,将数据帧保存到 parquetfile。

      val df = sqlContext.read
            .format("com.databricks.spark.csv")
            .option("header", "true")
            .option("inferSchema", "true")
            .option("mode", "DROPMALFORMED")
            .load("/home/myuser/data/log/*.csv")
      df.saveAsParquetFile("/home/myuser/data.parquet")
      

      【讨论】:

        【解决方案7】:
        from pyspark import SparkContext
        from pyspark.sql import SQLContext
        from pyspark.sql.types import *
        import sys
        
        sc = SparkContext(appName="CSV2Parquet")
        sqlContext = SQLContext(sc)
        
        schema = StructType([
            StructField("col1", StringType(), True),
            StructField("col2", StringType(), True),
            StructField("col3", StringType(), True),
            StructField("col4", StringType(), True),
            StructField("col5", StringType(), True)])
        rdd = sc.textFile('/input.csv').map(lambda line: line.split(","))
        df = sqlContext.createDataFrame(rdd, schema)
        df.write.parquet('/output.parquet')
        

        【讨论】:

          【解决方案8】:

          您可以使用 https://github.com/fraugster/parquet-go 项目中的 csv2parquet 工具。它比 Apache Drill 使用简单得多

          【讨论】:

            【解决方案9】:

            我制作了一个小型命令行工具来将 CSV 转换为 Parquet:https://github.com/domoritz/csv2parquet

            【讨论】:

              猜你喜欢
              • 2017-01-18
              • 2020-08-13
              • 2018-11-09
              • 1970-01-01
              • 1970-01-01
              • 1970-01-01
              • 2018-01-04
              • 1970-01-01
              • 2016-04-16
              相关资源
              最近更新 更多