【发布时间】:2019-07-18 18:23:57
【问题描述】:
我编写了一个 Pyspark 程序,它获取同一个输入文件的两个相同副本,并将数据转换为两个新文件,每个文件都有自己的格式。我将这两个文件读入数据帧,其中包含相同的行数。之后,我将这些数据帧改回 RDD 并应用不同的映射逻辑来转换行的字段(映射时不应用过滤器)。但是,输出数据帧不包含相同数量的行 - 它们在没有解释的情况下丢弃。
我尝试更改逻辑顺序,打印出各个阶段的行数等。日志不包含错误或警告,只有我自己的打印语句显示行数减少。
print("Input rows (f2): " + str(f2_df_count))
print("Input rows (f1): " + str(f1_df_count))
f2_rdd = f2_temp_df.rdd.map(list).map(lambda line:
("A",
line[52].strip(),
...
line[2].zfill(5))
f2_df = sqlContext.createDataFrame(f2_rdd, f2_SCHEMA).dropDuplicates()
f2_df.write.format(OUTPUT_FORMAT).options(delimiter='|').save(f2_OUTPUT)
f2_count = f2_df.count()
f1_rdd = f1_temp_df.rdd.map(list).map(lambda line:
("B",
line[39],
...
line[13] if line[16] != "D" else "C")
f1_df = sqlContext.createDataFrame(f1_rdd, f1_SCHEMA).dropDuplicates()
f1_df.write.format(OUTPUT_FORMAT).options(delimiter='|').save(f1_OUTPUT)
f1_count = f1_df.count()
print("F2 output rows: " + str(f2_count) + " rows (dropped " + str(f2_df_count - f2_count) + ").")
print("F1 output rows: " + str(f1_count) + " rows (dropped " + str(f1_df_count - f1_count) + ").")
没有错误消息,但我的日志清楚地显示正在删除行。更奇怪的是,它们被不一致地丢弃。 f1 丢失的行数与 f2 不同。
Input rows (f2): 261
Input rows (f1): 261
F2 output rows: 260 rows (dropped 1).
F1 output rows: 259 rows (dropped 2).
有时在较大的运行中差异会更大,大约为 100-200 行。如果有人能解释可能发生的事情以及我如何解决它,我将不胜感激。
【问题讨论】: