【问题标题】:Spark Dataframe : Missing files when writing a dataframe into CSV after partitionning over a windowSpark Dataframe:在窗口分区后将数据帧写入CSV时丢失文件
【发布时间】:2020-06-06 15:54:44
【问题描述】:

我有一个 Spark 数据框,假设由 10K ID 组成。 Dataframe 的每一行由一对 ID 及其欧几里德距离组成(每个 ID 代表一个文档。数据框如下所示:

ID_source | ID_destination | Euclidean Distance
1           1                0.0
1           2                1.3777
1           3                1.38
.           .                .
.           .                .
.           .                .
2           1                0.5555
2           2                0.0
.           .                .
.           .                .
.           .                .

对于每个 ID_source,我希望根据欧几里得距离获得前 10 个 ID_destination。在 Spark 中,我设法用下面的代码行做得很好。如上所述的矩阵被命名为similarity_join。

window = Window.orderBy(col("id_source")).partitionBy(col("id_source")).orderBy(col("EuclideanDistance").asc())
df_filtered = similarity_join.select('*', rank().over(window).alias('rank')).orderBy(col("id_source").asc()).filter((col('rank') <= 10))

当我想将结果写入 csv 时会出现问题。

date_now =  datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
model_filename = "description_dataframe_"+date_now 
df_filtered.write.csv(DESCRIPTION_MODEL_PATH+model_filename)

我在最终的 csv 中缺少 ID(在 hadoop 中压缩输出后获得)。当我使用低样本(10-500)时,我拥有所有 ID,但是当使用 5000 ID 样本时,我在 csv 中有很多缺失的 ID。看起来有些分区没有写在磁盘上。即使我使用 coalesce(1),我也有同样的问题。请提供任何帮助。我正在使用 5 台机器(1 台主机,4 台工人)。我打算增加 1000 万个 ID,所以我将有 1000 万个窗口(分区)

【问题讨论】:

标签: python dataframe apache-spark pyspark partition


【解决方案1】:

最后,问题不在于分区,也不在于写作部分。但相反,这是由于构建数据帧 (similarity_join) 的算法 (Bucketed Random LSH);该算法是非确定性的,因此结果的数量取决于随机选择的参数。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2018-01-19
    • 1970-01-01
    • 2018-03-21
    • 2020-03-24
    • 2018-11-30
    • 2018-03-21
    • 2016-03-30
    相关资源
    最近更新 更多