【问题标题】:PubsubIO , msg exceeding max size, how to perform error handlingPubsubIO , msg 超过最大大小,如何执行错误处理
【发布时间】:2019-06-09 09:21:26
【问题描述】:

我们在 GCP Dataflow 中运行管道,并遇到了 pubsub 消息 [1] 的最大消息大小 当这种情况发生时,管道延迟时间将开始增加,最终停止运行......

此日志消息是在 GCP 堆栈驱动程序中的“dataflow_step”下生成的,

我的问题,有没有办法在管道中定义错误处理...

.apply(PubsubIO.writeMessages()
                        .to("topic")
                        .withTimestampAttribute(Instant.now().toString()));

类似

.onError(...perform error handling ...)

以与 Java8 流 api 类似的流畅方式。这将允许管道继续使用 pubsub 限制内的输出。

非常欢迎使用其他解决方案来处理这种情况。

谢谢你, 克里斯托夫·布希耶

[1] 由于验证错误而无法提交请求:generic::invalid_argument: Pubsub 发布请求限制为 10MB,拒绝超过 7MB 的消息以避免超出 byte64 请求编码的限制。

【问题讨论】:

    标签: apache-beam google-cloud-pubsub dataflow


    【解决方案1】:

    对于 Dataflow 上的 PubsubIO 的特殊情况,请注意 Dataflow 会覆盖 PubsubIO 并处理对 Pubsub 的读取和写入消息,作为其流式实现的一部分。由于这种替换,我看到您正在讨论的相同错误显示在“shuffler”而不是“worker”下的日志中。

    我通过在 PubsubIO.write() 步骤之前实现自定义转换来解决相同的问题。此 LimitPayloadSize 转换仅检查 PubsubMessage 中有多少字节,并且只允许通过有效负载小于 7 MB 的消息。

    目前还没有一个流畅的 API 用于转换中的错误处理,尽管这已经讨论过了。目前,公认的模式是定义具有多个输出集合的转换,然后将失败消息的集合写入其他地方(例如通过 FileIO 的 GCS)。您可以将其实现为裸 DoFn,或者您可以查看 Partition:

    PCollectionList<PubsubMessage> limitedPayloads = input
            .apply("Limit payload size",
                    Partition
                            .of(2, new PartitionFn<PubsubMessage>() {
      public int partitionFor(PubsubMessage message, int numPartitions) {
        return message.getPayload().size < 7 * 1000 * 1000 ? 0 : 1;
      }
    }));
    limitedPayloads.get(0).apply(PubsubIO.write()...);
    limitedPayloads.get(1).apply(FileIO.write()...);
    

    【讨论】:

      猜你喜欢
      • 2023-03-24
      • 2017-01-23
      • 1970-01-01
      • 2021-03-04
      • 2017-10-09
      • 1970-01-01
      • 2022-12-04
      • 2021-01-27
      • 2019-03-04
      相关资源
      最近更新 更多