【发布时间】:2014-11-25 07:02:15
【问题描述】:
我是 BigData 的新手。我需要将 csv/txt 文件转换为 Parquet 格式。我搜索了很多,但找不到任何直接的方法。有什么方法可以实现吗?
【问题讨论】:
我是 BigData 的新手。我需要将 csv/txt 文件转换为 Parquet 格式。我搜索了很多,但找不到任何直接的方法。有什么方法可以实现吗?
【问题讨论】:
我已经在an answer 上发布了关于如何使用 Apache Drill 来做到这一点。但是,如果您熟悉 Python,您现在可以使用 Pandas 和 PyArrow!
使用pip:
pip install pandas pyarrow
或使用conda:
conda install pandas pyarrow -c conda-forge
# csv_to_parquet.py
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
csv_file = '/path/to/my.tsv'
parquet_file = '/path/to/my.parquet'
chunksize = 100_000
csv_stream = pd.read_csv(csv_file, sep='\t', chunksize=chunksize, low_memory=False)
for i, chunk in enumerate(csv_stream):
print("Chunk", i)
if i == 0:
# Guess the schema of the CSV file from the first chunk
parquet_schema = pa.Table.from_pandas(df=chunk).schema
# Open a Parquet file for writing
parquet_writer = pq.ParquetWriter(parquet_file, parquet_schema, compression='snappy')
# Write CSV chunk to the parquet file
table = pa.Table.from_pandas(chunk, schema=parquet_schema)
parquet_writer.write_table(table)
parquet_writer.close()
我没有将此代码与 Apache Drill 版本进行基准测试,但根据我的经验,它非常快,每秒转换数万行(这当然取决于 CSV 文件!)。
编辑:
我们现在可以使用pyarrow.csv.read_csv 将 CSV 文件直接读取到 PyArrow 表中。这可能比使用 Pandas CSV 阅读器更快,尽管它可能不太灵活。
【讨论】:
pyarrow 的经验,只是看到你的评论很好奇)
pd.read_csv 和pyarrow.csv.read_csv 方法接受的参数数量。举个具体的例子,pd.read_csv、sep="..."的情况可以是正则表达式,而pyarrow.csv.read_csv、delimiter="..."必须是单个字符。
[对于 Python]
Pandas 现在直接支持它。
只需使用 read_csv 将 csv 文件读入 pandas 的数据帧,然后使用 to_parquet 将该数据帧写入 parquet 文件即可。
【讨论】:
您可以使用Apache Drill,如Convert a CSV File to Apache Parquet With Drill 中所述。
简而言之:
开始 Apache Drill:
$ cd /opt/drill/bin $ sqlline -u jdbc:drill:zk=local创建 Parquet 文件:
-- 设置默认表格格式为 parquet ALTER SESSION SET `store.format`='parquet'; -- 创建一个 parquet 表,其中包含 CSV 表中的所有数据 创建表 dfs.tmp.`/stats/airport_data/` AS 选择 CAST(SUBSTR(columns[0],1,4) AS INT) `YEAR`, CAST(SUBSTR(columns[0],5,2) AS INT) `MONTH`, 列 [1] 作为“航空公司”, 列 [2] 作为“IATA_CODE”, 列[3] 为“AIRLINE_2”, 列 [4] 为“IATA_CODE_2”, 列 [5] 作为 `GEO_SUMMARY`, 列 [6] 为“GEO_REGION”, 列[7] 为“ACTIVITY_CODE”, 列 [8] 为“PRICE_CODE”, 列 [9] 作为“终端”, 列 [10] 为“BOARDING_AREA”, CAST(columns[11] AS DOUBLE) 作为`PASSENGER_COUNT` FROM dfs.`/opendata/Passenger/SFO_Passenger_Data/*.csv`;尝试从新 Parquet 文件中选择数据:
-- 从 parquet 表中选择数据 选择 * FROM dfs.tmp.`/stats/airport_data/*`您可以通过转到http://localhost:8047/storage/dfs(来源:CSV and Parquet)来更改dfs.tmp 的位置。
【讨论】:
以下代码是使用 spark2.0 的示例。读取比 inferSchema 选项快得多。 Spark 2.0 转换成 parquet 文件的效率比 spark1.6 高得多。
import org.apache.spark.sql.types._
var df = StructType(Array(StructField("timestamp", StringType, true),StructField("site", StringType, true),StructField("requests", LongType, true) ))
df = spark.read
.schema(df)
.option("header", "true")
.option("delimiter", "\t")
.csv("/user/hduser/wikipedia/pageviews-by-second-tsv")
df.write.parquet("/user/hduser/wikipedia/pageviews-by-second-parquet")
【讨论】:
1) 可以创建外部 hive 表
create external table emp(name string,job_title string,department string,salary_per_year int)
row format delimited
fields terminated by ','
location '.. hdfs location of csv file '
2) 另一个存储 parquet 文件的 hive 表
create external table emp_par(name string,job_title string,department string,salary_per_year int)
row format delimited
stored as PARQUET
location 'hdfs location were you want the save parquet file'
将表一数据插入表二:
insert overwrite table emp_par select * from emp
【讨论】:
将 csv 文件读取为 Dataframe in Apache Spark 和 spark-csv package。将数据加载到 Dataframe 后,将数据帧保存到 parquetfile。
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true")
.option("inferSchema", "true")
.option("mode", "DROPMALFORMED")
.load("/home/myuser/data/log/*.csv")
df.saveAsParquetFile("/home/myuser/data.parquet")
【讨论】:
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
import sys
sc = SparkContext(appName="CSV2Parquet")
sqlContext = SQLContext(sc)
schema = StructType([
StructField("col1", StringType(), True),
StructField("col2", StringType(), True),
StructField("col3", StringType(), True),
StructField("col4", StringType(), True),
StructField("col5", StringType(), True)])
rdd = sc.textFile('/input.csv').map(lambda line: line.split(","))
df = sqlContext.createDataFrame(rdd, schema)
df.write.parquet('/output.parquet')
【讨论】:
您可以使用 https://github.com/fraugster/parquet-go 项目中的 csv2parquet 工具。它比 Apache Drill 使用简单得多
【讨论】:
我制作了一个小型命令行工具来将 CSV 转换为 Parquet:https://github.com/domoritz/csv2parquet。
【讨论】: