【问题标题】:Pyspark Dropping RDD Rows Without FilterPyspark 丢弃没有过滤器的 RDD 行
【发布时间】:2019-07-18 18:23:57
【问题描述】:

我编写了一个 Pyspark 程序,它获取同一个输入文件的两个相同副本,并将数据转换为两个新文件,每个文件都有自己的格式。我将这两个文件读入数据帧,其中包含相同的行数。之后,我将这些数据帧改回 RDD 并应用不同的映射逻辑来转换行的字段(映射时不应用过滤器)。但是,输出数据帧不包含相同数量的行 - 它们在没有解释的情况下丢弃。

我尝试更改逻辑顺序,打印出各个阶段的行数等。日志不包含错误或警告,只有我自己的打印语句显示行数减少。

print("Input rows (f2): " + str(f2_df_count))
print("Input rows (f1): " + str(f1_df_count))


f2_rdd = f2_temp_df.rdd.map(list).map(lambda line:
    ("A",
    line[52].strip(),
    ...
    line[2].zfill(5))
f2_df = sqlContext.createDataFrame(f2_rdd, f2_SCHEMA).dropDuplicates()
f2_df.write.format(OUTPUT_FORMAT).options(delimiter='|').save(f2_OUTPUT)
f2_count = f2_df.count()


f1_rdd = f1_temp_df.rdd.map(list).map(lambda line:
    ("B",
    line[39],
    ...
    line[13] if line[16] != "D" else "C")
f1_df = sqlContext.createDataFrame(f1_rdd, f1_SCHEMA).dropDuplicates()
f1_df.write.format(OUTPUT_FORMAT).options(delimiter='|').save(f1_OUTPUT)
f1_count = f1_df.count()


print("F2 output rows: " + str(f2_count) + " rows (dropped " + str(f2_df_count - f2_count) + ").")
print("F1 output rows: " + str(f1_count) + " rows (dropped " + str(f1_df_count - f1_count) + ").")

没有错误消息,但我的日志清楚地显示正在删除行。更奇怪的是,它们被不一致地丢弃。 f1 丢失的行数与 f2 不同。

Input rows (f2): 261
Input rows (f1): 261
F2 output rows: 260 rows (dropped 1).
F1 output rows: 259 rows (dropped 2).

有时在较大的运行中差异会更大,大约为 100-200 行。如果有人能解释可能发生的事情以及我如何解决它,我将不胜感激。

【问题讨论】:

    标签: python pyspark


    【解决方案1】:

    答案是我假设之前删除了重复项,但是在将 RDD 重新创建为数据框后,我包含了一个额外的 dropDuplicate() 调用。对不起任何不必要地花时间在这上面的人!

    【讨论】:

      猜你喜欢
      • 2018-11-14
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多