【发布时间】:2020-06-06 15:54:44
【问题描述】:
我有一个 Spark 数据框,假设由 10K ID 组成。 Dataframe 的每一行由一对 ID 及其欧几里德距离组成(每个 ID 代表一个文档。数据框如下所示:
ID_source | ID_destination | Euclidean Distance
1 1 0.0
1 2 1.3777
1 3 1.38
. . .
. . .
. . .
2 1 0.5555
2 2 0.0
. . .
. . .
. . .
对于每个 ID_source,我希望根据欧几里得距离获得前 10 个 ID_destination。在 Spark 中,我设法用下面的代码行做得很好。如上所述的矩阵被命名为similarity_join。
window = Window.orderBy(col("id_source")).partitionBy(col("id_source")).orderBy(col("EuclideanDistance").asc())
df_filtered = similarity_join.select('*', rank().over(window).alias('rank')).orderBy(col("id_source").asc()).filter((col('rank') <= 10))
当我想将结果写入 csv 时会出现问题。
date_now = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
model_filename = "description_dataframe_"+date_now
df_filtered.write.csv(DESCRIPTION_MODEL_PATH+model_filename)
我在最终的 csv 中缺少 ID(在 hadoop 中压缩输出后获得)。当我使用低样本(10-500)时,我拥有所有 ID,但是当使用 5000 ID 样本时,我在 csv 中有很多缺失的 ID。看起来有些分区没有写在磁盘上。即使我使用 coalesce(1),我也有同样的问题。请提供任何帮助。我正在使用 5 台机器(1 台主机,4 台工人)。我打算增加 1000 万个 ID,所以我将有 1000 万个窗口(分区)
【问题讨论】:
-
我认为您应该做的第一件事是在
Window定义中整理您的orderBys。 -
谢谢@mazaneicha,但我相信Spark,sort是orderBy的别名。 stackoverflow.com/questions/40603202/…
标签: python dataframe apache-spark pyspark partition