【问题标题】:Spark Streaming dropDuplicatesSpark Streaming dropDuplicates
【发布时间】:2017-12-30 16:10:16
【问题描述】:

Spark 2.1.1 (scala api) 从 s3 位置流式传输 json 文件。

我想根据每个记录的 json 中的 ID 列 (“event_id”) 对所有传入记录进行重复数据删除。我不在乎保留哪条记录,即使记录的重复只是部分的。我正在使用附加模式,因为数据只是通过 spark.sql() 方法进行丰富/过滤,没有分组/窗口聚合。然后我使用附加模式将 parquet 文件写入 s3。

根据文档,我应该能够使用不带水印的 dropDuplicates 来进行重复数据删除(显然这在长时间运行的生产中无效)。但是,这会失败并出现错误:

用户类抛出异常:org.apache.spark.sql.AnalysisException:流数据帧/数据集上存在流聚合时不支持附加输出模式

这个错误看起来很奇怪,因为我没有进行聚合(除非 dropDuplicates 或 sparkSQL 算作聚合?)。

我知道重复不会在 3 天之后发生,所以我通过添加水印再次尝试(通过在 drop 重复之前立即使用 .withWatermark())。但是,它似乎要等到 3 天后才能写入数据。 (即,由于今天是 7 月 24 日,因此仅将截至 7 月 21 日同一时间的数据写入输出)。

由于没有聚合,我想在批处理处理后立即写入每一行,并简单地丢弃前 3 天发生的具有事件 id 的任何行。有没有一种简单的方法可以做到这一点?

谢谢

【问题讨论】:

    标签: scala hadoop apache-spark spark-streaming


    【解决方案1】:

    就我而言,我曾经通过 DStream 以两种方式实现:

    一种方式:

    1. 加载tmp_data(包含3天的唯一数据,见下文)
    2. 接收 batch_data 并使用 tmp_data 执行 leftOuterJoin
    3. 在 step2 上执行filter 并输出新的唯一数据
    4. 通过 step2 的结果用新的唯一数据更新 tmp_data 并删除旧数据(超过 3 天)
    5. tmp_data 保存在 HDFS 或其他任何地方
    6. 一遍又一遍地重复以上内容

    另一种方式:

    1. 在mysql上创建一个表并在event_id上设置UNIQUE INDEX
    2. 接收 batch_data 并将 event_id + event_time +whatever 保存到 mysql
    3. mysql 会自动忽略重复项

    【讨论】:

      【解决方案2】:

      我们使用的解决方案是 org.apache.spark.sql.execution.streaming.Sink 的自定义实现,它在批处理中删除重复项并针对前几天的数据执行左反连接后插入配置单元表目标配置单元表。

      【讨论】:

        猜你喜欢
        • 2016-06-10
        • 1970-01-01
        • 1970-01-01
        • 2020-03-19
        • 1970-01-01
        • 2017-11-14
        • 1970-01-01
        • 2016-06-13
        • 2015-12-18
        相关资源
        最近更新 更多