【问题标题】:Parallelize GZip file processing Spark并行化 GZip 文件处理 Spark
【发布时间】:2016-05-26 12:27:59
【问题描述】:

我有大量需要转换为 Parquet 的 GZip 文件。由于 GZip 的压缩特性,不能对一个文件进行并行处理。

但是,既然我有很多,有没有一种相对简单的方法可以让每个节点都做部分文件?这些文件位于 HDFS 上。我假设我不能使用 RDD 基础架构来编写 Parquet 文件,因为这一切都是在驱动程序上完成的,而不是在节点本身上完成的。

我可以并行化文件名列表,编写一个处理 Parquet 本地的函数并将它们保存回 HDFS。我不知道该怎么做。我觉得我遗漏了一些明显的东西,谢谢!

这被标记为重复问题,但事实并非如此。我完全了解 Spark 能够将它们作为 RDD 读取而不必担心压缩,我的问题更多是关于如何将这些文件并行转换为结构化 Parquet 文件。

如果我知道如何在没有 Spark 的情况下与 Parquet 文件进行交互,我可以这样做:

def convert_gzip_to_parquet(file_from, file_to):
    gzipped_csv = read_gzip_file(file_from)
    write_csv_to_parquet_on_hdfs(file_to)

# Filename RDD contains tuples with file_from and file_to
filenameRDD.map(lambda x: convert_gzip_to_parquet(x[0], x[1]))

这可以让我并行化,但是我不知道如何在本地环境中与 HDFS 和 Parquet 交互。我也想知道:

1) 怎么做

或者..

2) 如何使用 PySpark 以不同的方式并行化此过程

【问题讨论】:

  • 你的问题很好。这可能不是它的论坛,因为它似乎不是一个编程问题。
  • 嗯,这是关于在 PySpark 中实现并行化
  • 好的,这并没有真正出现在问题中。我的建议:在 pyspark 中进行某种尝试,如果它不起作用,请将您尝试的内容带到这里(在一个新问题中)。
  • 好吧,我又把问题扩大了,我给zero323发消息。我不知道如何处理它,这就是我寻求帮助的原因:) 谢谢

标签: python hadoop apache-spark gzip pyspark


【解决方案1】:

我会建议以下两种方法之一(在实践中,我发现第一种在性能方面可以提供更好的结果)。

将每个 Zip 文件写入单独的 Parquet 文件

这里您可以使用pyarrow 将 Parquet-File 写入 HDFS:

def convert_gzip_to_parquet(file_from, file_to):
    gzipped_csv = read_gzip_file(file_from)
    pyarrow_table = to_pyarrow_table(gzipped_csv)
    hdfs_client = pyarrow.HdfsClient()
    with hdfs_client.open(file_to, "wb") as f:
        pyarrow.parquet.write_table(pyarrow_table, f)

# Filename RDD contains tuples with file_from and file_to
filenameRDD.map(lambda x: convert_gzip_to_parquet(x[0], x[1]))

pyarrow.Table对象的获取方式有两种:

  • 要么从 pandas DataFrame 中获取(在这种情况下,您还可以使用 pandas 的 read_csv() 函数):pyarrow_table = pyarrow.Table.from_pandas(pandas_df)

  • 或使用pyarrow.Table.from_arrays手动构造它

要让 pyarrow 与 HDFS 一起工作,需要正确设置多个环境变量,请参阅here

将所有 Zip 文件中的行连接到一个 Parquet 文件中

def get_rows_from_gzip(file_from):
    rows = read_gzip_file(file_from)
    return rows

# read the rows of each zip file into a Row object
rows_rdd = filenameRDD.map(lambda x: get_rows_from_gzip(x[0]))

# flatten list of lists
rows_rdd = rows_rdd.flatMap(lambda x: x)

# convert to DataFrame and write to Parquet
df = spark_session.create_DataFrame(rows_rdd)
df.write.parquet(file_to)

如果事先知道数据的schema,传入一个schema对象到create_DataFrame会加快DataFrame的创建速度。

【讨论】:

    猜你喜欢
    • 2017-03-22
    • 2016-07-31
    • 1970-01-01
    • 1970-01-01
    • 2017-04-24
    • 2016-08-14
    • 2020-05-02
    • 2020-06-06
    • 1970-01-01
    相关资源
    最近更新 更多