【问题标题】:Apache beam WithTimestamps: Output timestamps must be no earlier than timestamp of current inputApache beam WithTimestamps:输出时间戳不得早于当前输入的时间戳
【发布时间】:2019-07-20 12:21:51
【问题描述】:

我正在尝试以 10 秒的频率从谷歌云 pubsub 流中显示数据,但是我收到此错误:

java.lang.IllegalArgumentException:无法输出时间戳为 2019-07-20T12:13:04.875Z。输出时间戳不得早于当前输入的时间戳 (2019-07-20T12:13:05.591Z) 减去允许的偏差(0 毫秒)。有关更改允许的倾斜的详细信息,请参阅 DoFn#getAllowedTimestampSkew() Javadoc。 org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.checkTimestamp(SimpleDoFnRunner.java:587) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.outputWithTimestamp(SimpleDoFnRunner.java:566) org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.outputWithTimestamp(DoFnOutputReceivers.java:80) org.apache.beam.sdk.transforms.WithTimestamps$AddTimestampsDoFn.processElement(WithTimestamps.java:136)

这是导致错误的代码:

eventStream
  .apply("Add Event Timestamps",
    WithTimestamps.of((Event event) -> new Instant(event.getTime())))
  .apply("Window Events",
    Window.<Event>into(FixedWindows.of(Duration.parseDuration("10s"))));

这是什么原因,什么是合适的解决方案?

【问题讨论】:

    标签: java google-cloud-dataflow apache-beam windowing


    【解决方案1】:

    来自文档:

    如果输入 {@link PCollection} 元素有时间戳,则输出 每个元素的时间戳不能在输入元素的时间戳之前 时间戳减去 {@link getAllowedTimestampSkew()} 的值。如果 输出时间戳在此时间之前,转换将抛出一个 {@link IllegalArgumentException} 执行时。使用 {@link withAllowedTimestampSkew(Duration)} 更新允许的偏差。

    注意:使用 {@link #withAllowedTimestampSkew(Duration)} 许可 要在水印后面发出的元素。这些元素是 考虑晚了,如果在 {@link Window#withAllowedLateness(Duration) allowed lateness} 下游 {@link PCollection} 可能会被静默删除。

    所以,要解决这个问题,您可以使用withAllowedTimestampSkew

    我使用了不同的 API:withTimestampAttribute。 您可以在 JSON/AVRO 中设置一个包含时间戳字段的属性。

    此 API 在发布时可用:

      .apply(PubsubIO.writeAvros(Someclass.class)
             .withIdAttribute("id")
             .withTimestampAttribute("myTime").to(topic));
    

    订阅时:

    .apply(PubsubIO.readAvros(Someclass.class) .fromSubscription(...)
           .withIdAttribute("id").withTimestampAttribute("myTime"))
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2014-08-15
      • 1970-01-01
      • 2020-09-28
      • 1970-01-01
      • 2010-12-02
      • 2023-03-14
      • 2022-11-17
      • 1970-01-01
      相关资源
      最近更新 更多