【问题标题】:Google Pub/Sub to Dataflow, avoid duplicates with Record IDGoogle Pub/Sub to Dataflow,避免与记录 ID 重复
【发布时间】:2017-06-18 11:53:52
【问题描述】:

我正在尝试构建一个流式数据流作业,它从 Pub/Sub 读取事件并将它们写入 BigQuery。

根据文档,如果使用记录 ID,Dataflow 可以检测重复的消息传递(请参阅:https://cloud.google.com/dataflow/model/pubsub-io#using-record-ids

但是即使使用这个记录 ID,我仍然有一些重复 (大约 0.0002%)。

我错过了什么吗?

编辑:

我使用Spotify Async PubSub Client 发布带有以下snipplet 的消息:

Message
      .builder()
      .data(new String(Base64.encodeBase64(json.getBytes())))
      .attributes("myid", id, "mytimestamp", timestamp.toString)
      .build()

然后我使用Spotify scio 从 pub/sub 读取消息并将其保存到 DataFlow:

val input = sc.withName("ReadFromSubscription")
              .pubsubSubscription(subscriptionName, "myid", "mytimestamp")
input
    .withName("FixedWindow")
    .withFixedWindows(windowSize)  // apply windowing logic
    .toWindowed  // convert to WindowedSCollection
    //
    .withName("ParseJson")
    .map { wv =>
      wv.copy(value = TableRow(
        "message_id" -> (Json.parse(wv.value) \ "id").as[String],
        "message" -> wv.value)
      )
    }
    //
    .toSCollection  // convert back to normal SCollection
    //
    .withName("SaveToBigQuery")
    .saveAsBigQuery(bigQueryTable(opts), BQ_SCHEMA, WriteDisposition.WRITE_APPEND)

窗口大小为 1 分钟。

在注入消息几秒钟后,我已经在 BigQuery 中找到了重复消息。

我使用这个查询来计算重复:

SELECT 
   COUNT(message_id) AS TOTAL, 
   COUNT(DISTINCT message_id) AS DISTINCT_TOTAL 
FROM my_dataset.my_table

//returning 273666  273564

还有这个来看看他们:

SELECT *
FROM my_dataset.my_table
WHERE message_id IN (
  SELECT message_id
  FROM my_dataset.my_table
  GROUP BY message_id
  HAVING COUNT(*) > 1
) ORDER BY message_id

//returning for instance:
row|id                                    | processed_at           | processed_at_epoch    
1   00166a5c-9143-3b9e-92c6-aab52601b0be    2017-02-02 14:06:50 UTC 1486044410367   { ...json1... }  
2   00166a5c-9143-3b9e-92c6-aab52601b0be    2017-02-02 14:06:50 UTC 1486044410368   { ...json1... }  
3   00354cc4-4794-3878-8762-f8784187c843    2017-02-02 13:59:33 UTC 1486043973907   { ...json2... }  
4   00354cc4-4794-3878-8762-f8784187c843    2017-02-02 13:59:33 UTC 1486043973741   { ...json2... } 
5   0047284e-0e89-3d57-b04d-ebe4c673cc1a    2017-02-02 14:09:10 UTC 1486044550489   { ...json3... } 
6   0047284e-0e89-3d57-b04d-ebe4c673cc1a    2017-02-02 14:08:52 UTC 1486044532680   { ...json3... }

【问题讨论】:

  • 您能否详细说明您如何使用记录 ID 和测量重复项?请注意文档中的“Dataflow 不会对发布到 Pub/Sub 间隔超过 10 分钟的具有相同记录 ID 值的消息执行此重复数据删除。”这会导致您观察到重复吗?
  • 我添加了更多信息 :)

标签: google-bigquery google-cloud-platform google-cloud-dataflow google-cloud-pubsub spotify-scio


【解决方案1】:

BigQuery documentation states 可能会出现重复到达的罕见情况:

  1. “BigQuery 记住此 ID 至少一分钟”- 如果 Dataflow 在重试插入之前花费超过一分钟 BigQuery 可能允许重复输入。您可以查看管道中的日志以确定这是否是这样的。
  2. “在 Google 数据中心意外断开连接的罕见情况下,可能无法进行自动重复数据删除。”

您可能想尝试manually removing duplicates 的说明。这也将允许您查看与每行一起使用的 insertID,以确定问题是在 Dataflow 端(为同一记录生成不同的 insertIDs)还是在 BigQuery 端(未能基于重复数据删除行)在他们的insertID)。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2019-08-20
    • 2021-06-09
    • 2017-09-07
    • 1970-01-01
    • 2018-08-24
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多