另一种选择是使用分区拼花格式,并为要附加的每个数据帧添加一个额外的拼花文件。通过这种方式,您可以创建(数百、数千、数百万)个 parquet 文件,当您稍后读取目录时,spark 会将它们作为一个联合文件读取。
本例使用 pyarrow
注意,如果您已经知道要将单个 parquet 文件放在哪里,我还展示了如何编写未分区的单个 parquet (example.parquet)。
import pyarrow.parquet as pq
import pandas as pd
headers=['A', 'B', 'C']
row1 = ['a1', 'b1', 'c1']
row2 = ['a2', 'b2', 'c2']
df1 = pd.DataFrame([row1], columns=headers)
df2 = pd.DataFrame([row2], columns=headers)
df3 = df1.append(df2, ignore_index=True)
table = pa.Table.from_pandas(df3)
pq.write_table(table, 'example.parquet', flavor='spark')
pq.write_to_dataset(table, root_path="test_part_file", partition_cols=['B', 'C'], flavor='spark')
# Adding a new partition (B=b2/C=c3
row3 = ['a3', 'b3', 'c3']
df4 = pd.DataFrame([row3], columns=headers)
table2 = pa.Table.from_pandas(df4)
pq.write_to_dataset(table2, root_path="test_part_file", partition_cols=['B', 'C'], flavor='spark')
# Add another parquet file to the B=b2/C=c2 partition
# Note this does not overwrite existing partitions, it just appends a new .parquet file.
# If files already exist, then you will get a union result of the two (or multiple) files when you read the partition
row5 = ['a5', 'b2', 'c2']
df5 = pd.DataFrame([row5], columns=headers)
table3 = pa.Table.from_pandas(df5)
pq.write_to_dataset(table3, root_path="test_part_file", partition_cols=['B', 'C'], flavor='spark')
之后读取输出
from pyspark.sql import SparkSession
spark = (SparkSession
.builder
.appName("testing parquet read")
.getOrCreate())
df_spark = spark.read.parquet('test_part_file')
df_spark.show(25, False)
你应该会看到这样的东西
+---+---+---+
|A |B |C |
+---+---+---+
|a5 |b2 |c2 |
|a2 |b2 |c2 |
|a1 |b1 |c1 |
|a3 |b3 |c3 |
+---+---+---+
如果您再次端到端地运行相同的东西,您应该会看到类似这样的重复项(因为之前的所有 parquet 文件仍然存在,所以火花联合它们)。
+---+---+---+
|A |B |C |
+---+---+---+
|a2 |b2 |c2 |
|a5 |b2 |c2 |
|a5 |b2 |c2 |
|a2 |b2 |c2 |
|a1 |b1 |c1 |
|a1 |b1 |c1 |
|a3 |b3 |c3 |
|a3 |b3 |c3 |
+---+---+---+