【问题标题】:Spark DataFrame append row n-time based on a weight col valueSpark DataFrame 根据权重 col 值追加行 n 次
【发布时间】:2018-04-27 15:10:11
【问题描述】:

我想要做的是“过度采样”一个小的 csv 文件,每行中都有一个权重值列。

Age|City|Weight
20 | NY |2
30 | SF |3

 Age|City|
 20 | NY |
 20 | NY |
 30 | SF |
 30 | SF |
 30 | SF |

通过 panda 和 np 我做到了

df = pd.read_csv('file.csv',low_memory=False)
weights=round(df.weight)
df.loc[np.repeat(df.index.values,weights)]

但是它太慢了,它在超过 24 小时内使用 100% 的 1 个 CPU(15 个可用)和所有 65G 内存,最后崩溃。 最终文件应包含超过 7000 万行。

所以我尝试使用 Spark。

rdd.map(lamba x: rdd.udf()) 或类似的东西,结合explode() 应该会有所帮助,但我不明白如何使它正确。 最后,我需要将 DataFrame 或 RDD 保存在一个未分区的 CSV 中:一个我可以与 panda 一起使用的 csv。

谢谢!

【问题讨论】:

    标签: apache-spark dataframe rdd


    【解决方案1】:

    如果您可以将数据加载到内存中并且您打算使用 Pandas 在本地读取输出,则没有任何迹象表明您需要 Spark。简单点就行了

    import csv
    
    with open("input.csv") as fr, open("output.csv", "w") as fw:
        reader = csv.reader(fr)
        writer = csv.writer(fw)
        for age, city, weight in reader:
            if age == "age":
                writer.writerow((age, city))
            else:
                writer.writerows((age, city) for _ in range(int(weight)))
    

    或者更多列(我假设权重是最后一列,根据实际数据的形状调整):

    with open("input.csv") as fr, open("output.csv", "w") as fw:
        reader = csv.reader(fr)
        writer = csv.writer(fw)
        for row:
            if row[0] == "age":
                writer.writerow(row[:-1])  
            else:
                writer.writerows(row[:-1] for _ in range(int(row[-1])))
    

    【讨论】:

    • 我做了一个非常简单的例子,但真正的文件有 246 列 40k 法分,每行的重量在 2000 到 5000 之间。我现在用 3 列尝试你的建议,这需要几个小时.希望它不会崩溃。
    • 这并不真正影响答案。只需调整列(您不必列举这些)。
    • 它正在运行,但使用了所有内存并开始大量交换 1 个 CPU 的 100%。我只需要运行一次就可以了。
    【解决方案2】:

    好吧,我设法让它运行谢谢:

    with open("file_in.csv",encoding='utf-8') as fr, open("file_out.csv", "w",encoding='utf-8') as fw:
    reader = csv.reader(fr)
    writer = csv.writer(fw)
    
    for row in reader:
        if row[0] == "firstColName" :
            writer.writerow(row[:-1])  
        else:
            writer.writerows(row[:-1] for _ in range(int(row[-1])))
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2019-05-09
      • 2021-12-27
      • 1970-01-01
      • 2018-10-29
      • 1970-01-01
      • 1970-01-01
      • 2019-03-07
      相关资源
      最近更新 更多