【发布时间】:2017-12-30 16:10:16
【问题描述】:
Spark 2.1.1 (scala api) 从 s3 位置流式传输 json 文件。
我想根据每个记录的 json 中的 ID 列 (“event_id”) 对所有传入记录进行重复数据删除。我不在乎保留哪条记录,即使记录的重复只是部分的。我正在使用附加模式,因为数据只是通过 spark.sql() 方法进行丰富/过滤,没有分组/窗口聚合。然后我使用附加模式将 parquet 文件写入 s3。
根据文档,我应该能够使用不带水印的 dropDuplicates 来进行重复数据删除(显然这在长时间运行的生产中无效)。但是,这会失败并出现错误:
用户类抛出异常:org.apache.spark.sql.AnalysisException:流数据帧/数据集上存在流聚合时不支持附加输出模式
这个错误看起来很奇怪,因为我没有进行聚合(除非 dropDuplicates 或 sparkSQL 算作聚合?)。
我知道重复不会在 3 天之后发生,所以我通过添加水印再次尝试(通过在 drop 重复之前立即使用 .withWatermark())。但是,它似乎要等到 3 天后才能写入数据。 (即,由于今天是 7 月 24 日,因此仅将截至 7 月 21 日同一时间的数据写入输出)。
由于没有聚合,我想在批处理处理后立即写入每一行,并简单地丢弃前 3 天发生的具有事件 id 的任何行。有没有一种简单的方法可以做到这一点?
谢谢
【问题讨论】:
标签: scala hadoop apache-spark spark-streaming