【问题标题】:Apache Beam - Flink runner - FileIO.write - issues in S3 writesApache Beam - Flink runner - FileIO.write - S3 写入中的问题
【发布时间】:2021-02-22 08:28:29
【问题描述】:

我目前正在研究一个 Beam 管道 (2.23) (Flink runner - 1.8),我们在其中读取 JSON 来自 Kafka 的事件并将 parquet 格式的输出写入 S3。

我们每 10 分钟后写入 S3。

我们观察到,如果我们更改 kafka,我们的管道有时会在进行小的非破坏性代码更改和部署管道后停止写入 S3 偏移并重新启动管道,它会再次开始写入 S3。

虽然 FileIO 不写入 s3,但 Pipeline 运行良好,没有任何错误/异常,并且它 处理记录直到 FileIO 阶段。它在日志中没有错误/异常 但在 FileIO 阶段默默地无法处理任何内容。

该阶段的水印也没有进展,它显示管道停止部署的时间的水印(保存点时间)

我们已经通过在窗口化后记录记录来检查我们的窗口化功能, 窗口工作正常。

此外,如果我们将 FileIO 替换为 Kafka 作为输出,管道运行良好,并在部署后继续向 kafka 输出记录。

这是我们的代码 sn-p -

parquetRecord.apply("Batch Events", Window.<GenericRecord>into(

FixedWindows.of(Duration.standardMinutes(Integer.parseInt(windowTime))))
                    .triggering(AfterWatermark.pastEndOfWindow())
                    .withAllowedLateness(Duration.ZERO,
Window.ClosingBehavior.FIRE_ALWAYS)
                    .discardingFiredPanes())

                    .apply(Distinct.create())

                    .apply(FileIO.<GenericRecord>write()
                            .via(ParquetIO.sink(getOutput_schema()))
                            .to(outputPath.isEmpty() ? outputPath() :
outputPath)
                            .withNumShards(1)
                            .withNaming(new
CustomFileNaming("snappy.parquet")));

Flink 用户界面截图。它显示记录即将到来,直到 FileIO.Write。

这是不发送任何记录的阶段-

FileIO.Write/WriteFiles/WriteShardedBundlesToTempFiles/GroupIntoShards ->
FileIO.Write/WriteFiles/WriteShardedBundlesToTempFiles/WriteShardsIntoTempFiles/ParMultiDo(WriteShardsIntoTempFiles)
-> FileIO.Write/WriteFiles/GatherTempFileResults/Add void
key/AddKeys/Map/ParMultiDo(Anonymous)

知道这里可能出了什么问题或 Beam/Flink 中有任何未解决的错误吗?

【问题讨论】:

    标签: apache-flink apache-beam apache-beam-io


    【解决方案1】:

    这个GroupByKey:https://github.com/apache/beam/blob/050b642b49f71a71434480c29272a64314992ee7/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L674似乎没有输出

    这是因为默认情况下,输出会重新窗口化到全局窗口中,并且触发器设置为默认触发器。

    您需要将.withWindowedWrites 添加到您的FileIO 配置中。

    【讨论】:

    • 是否可以在FileIO.Write 中为GenericRecord 添加.withWindowedWrites。它给出了一个方法不可用的错误。有没有其他方法来指定它。另外,如果未指定 IgnoreWindowing 选项,我在 FileIO 中看到这是默认设置的。 github.com/apache/beam/blob/…。我很好奇为什么除了在代码更改后启动管道之外它一直都可以很好地写入 s3。即使管道运行良好,它是否始终使用全局窗口写入。
    • 如果我们从 DAG 中删除 Distinct.create() 转换,s3 写入在所有情况下(重启/代码更改)都可以正常工作
    【解决方案2】:

    您是否尝试过增加 .withNumShards(1)?我们有一个批处理用例失败,Shards 设置为 1。还从 FlinkRunner 写入 S3。我们认为这是 FlinkRunner 的一个错误。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-10-26
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多