【问题标题】:How to deduplicate messages from GCP PubSub in DataFlow using Apache Beam's PubSubIO withIdAttribute如何使用 Apache Beam 的 PubSubIO withIdAttribute 在 DataFlow 中对来自 GCP PubSub 的消息进行重复数据删除
【发布时间】:2018-05-30 02:59:53
【问题描述】:

我目前正在尝试使用 withIdAttributePubSubIO 对来自 PubSub 的消息进行重复数据删除(因为 PubSub 只保证至少一次传递)。

我的消息有四个字段,label1label2timestampvalue。在某个时间戳上,两个标签的值是唯一的。因此,我在写入 PubSub 之前另外设置了一个 uniqueID 属性,该属性等于作为字符串连接的这三个值。

例如,这是我使用 gcp 控制台工具从订阅中读取的结果。

┌───────────────────────────────────────────────────────────────────────────────────────────────────────────┬────────────────┬───────────────────────────────────────────────────────────────────────────────────────────────────┐
│                                                    DATA                                                   │   MESSAGE_ID   │                                               ATTRIBUTES                                          │
├───────────────────────────────────────────────────────────────────────────────────────────────────────────┼────────────────┼───────────────────────────────────────────────────────────────────────────────────────────────────┤
│ {"label1":"5c381a51-2873-49b8-acf5-60a0fa59fc65","label2":"foobarbaz","timestamp":1513199383,"value":4.2} │ 11185357338249 │ eventTime=2017-12-13T21:09:43Z uniqueID=5c381a51-2873-49b8-acf5-60a0fa59fc65:foobarbaz:1513199383 │
└───────────────────────────────────────────────────────────────────────────────────────────────────────────┴────────────────┴───────────────────────────────────────────────────────────────────────────────────────────────────┘

在我的 Beam 作业中,在 GCP Dataflow 上运行,我将这些消息解码为 json,将它们窗口化,按它们的两个标签对它们进行分组,然后尝试聚合它们。但是,在我的聚合类CreateMyAggregationsFn 中,我看到具有相同label1label2timestamp 的重复消息。

public class MyBeam {
  public interface MyBeanOptions extends PipelineOptions {
    // ...
  }

  private static class MyMessage implements Serializable {
    public long timestamp;
    public double value;
    public String label1;
    public String label2;
  }

  private static class CreateMyAggregationsFn extends DoFn<KV<String, Iterable<MyMessage>>, MyAggregate> {
    @ProcessElement
    public void processElement(ProcessContext c) {
      ArrayList<MyMessage> messages = new ArrayList<>();
      c.element().getValue().forEach(messages::add);
      Collections.sort(messages, (msg1, msg2) -> Long.compare(msg1.timestamp, msg2.timestamp));

      MyMessage prev = null
      for (MyMessage msg : messages) {
        if (prev != null &&
            msg.timestamp == prev.timestamp && 
            msg.label1.equals(prev.label1) && 
            msg.label2.equals(prev.label2)) {
            // ... identifying duplicates here
        }
        prev = msg;
      }
      ...
    }
  }

  public static void main(String[] args) throws IOException {
    MyBeamOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyBeamOptions.class);
    Pipeline pipeline = Pipeline.create(options);
    PubsubIO.Read<String> pubsubReadSubscription =
        PubsubIO.readStrings()
            .withTimestampAttribute("eventTime")
            .withIdAttribute("uniqueID")
            .fromSubscription(options.getPubsubSubscription());
    pipeline
        .apply("PubsubReadSubscription", pubsubReadSubscription)
        .apply("ParseJsons", ParseJsons.of(MyMessage.class))
        .setCoder(SerializableCoder.of(MyMessage.class))
        .apply(
            "Window",
            Window.<MyMessage>into(FixedWindows.of(Duration.standardSeconds(60)))
                .triggering(
                    AfterWatermark.pastEndOfWindow()
                        .withLateFirings(AfterPane.elementCountAtLeast(1)))
                .accumulatingFiredPanes()
                .withAllowedLateness(Duration.standardSeconds(3600)))
        .apply(
            "PairMessagesWithLabels",
            MapElements.into(
                    TypeDescriptors.kvs(
                        TypeDescriptors.strings(), TypeDescriptor.of(MyMessage.class)))
                .via(msg -> KV.of(msg.label1 + ":" + msg.label2, msg)))
        .apply("GroupMessagesByLabels", GroupByKey.<String, MyMessage>create())
        .apply("CreateAggregations", ParDo.of(new CreateMyAggregationsFn()))
        // ...
    PipelineResult result = pipeline.run();
  }
}

是否有额外的步骤来使用我缺少的 withIdAttribute 方法从 PubSubIO 中删除消息?

【问题讨论】:

    标签: google-cloud-dataflow apache-beam google-cloud-pubsub


    【解决方案1】:

    您正在指定accumulatingFiredPanes(),这意味着在一个窗口多次触发的情况下(例如,如果延迟数据到达),您要求连续触发包括之前触发的所有元素,而不仅仅是新元素。根据定义,这会产生重复。你想通过指定accumulatingFiredPanes() 来达到什么目的?

    【讨论】:

    • 是的。我希望在此过程的下游重复写入。结果被更新。然而,我看到重复的地方来自上游(来自 pubsub),它偶尔会向我提供两次相同的事件。而且,在我的窗口函数中,我的元素没有像我期望的那样被删除重复数据withIdAttribute
    • 你如何确定你收到了重复?您是否在其中一个 ParDos(哪一个?)中记录了事件 ID,并看到多次记录相同的 ID?
    • 感谢您的回复。是的。在CreateMyAggregationsFn parDo 中,我按时间戳排序并遍历它们,将每个label1label2timestamp 与前一个进行比较。目前,这是手动识别和删除消息的有效方法。不确定eventId 是什么意思。在我的用例中,当它点击CreateMyAggregationsFn 时,窗口化和分组的元素是MyMessage 类的实例,所以我正在检查这些值。我确信这些值与uniqueID 属性相匹配。你能建议一种方法来捕获下游的这个值来确认吗?
    • 我在CreateMyAggregationsFn 中更新了我的示例代码,以进一步阐明我如何识别重复项。
    • 嗯:您要求 PubsubIO.read() 根据“uniqueID”属性进行重复数据删除,但您是根据消息有效负载中的 JSON 字段识别重复项。您是否有可能有多条消息通过 Pubsub 到达,在有效负载中具有相同的 label1 和 label2,但具有不同的“uniqueID”属性?您可以通过在 PubsubReadSubscription 之后添加 ParDo 来调试它以记录完整的消息内容,并查看是否有任何特定的 uniqueID 出现两次。
    猜你喜欢
    • 2018-11-06
    • 2021-01-09
    • 1970-01-01
    • 1970-01-01
    • 2017-06-03
    • 2020-08-13
    • 2020-03-22
    • 2017-10-23
    • 2021-04-29
    相关资源
    最近更新 更多