【发布时间】:2021-02-22 08:28:29
【问题描述】:
我目前正在研究一个 Beam 管道 (2.23) (Flink runner - 1.8),我们在其中读取 JSON 来自 Kafka 的事件并将 parquet 格式的输出写入 S3。
我们每 10 分钟后写入 S3。
我们观察到,如果我们更改 kafka,我们的管道有时会在进行小的非破坏性代码更改和部署管道后停止写入 S3 偏移并重新启动管道,它会再次开始写入 S3。
虽然 FileIO 不写入 s3,但 Pipeline 运行良好,没有任何错误/异常,并且它 处理记录直到 FileIO 阶段。它在日志中没有错误/异常 但在 FileIO 阶段默默地无法处理任何内容。
该阶段的水印也没有进展,它显示管道停止部署的时间的水印(保存点时间)
我们已经通过在窗口化后记录记录来检查我们的窗口化功能, 窗口工作正常。
此外,如果我们将 FileIO 替换为 Kafka 作为输出,管道运行良好,并在部署后继续向 kafka 输出记录。
这是我们的代码 sn-p -
parquetRecord.apply("Batch Events", Window.<GenericRecord>into(
FixedWindows.of(Duration.standardMinutes(Integer.parseInt(windowTime))))
.triggering(AfterWatermark.pastEndOfWindow())
.withAllowedLateness(Duration.ZERO,
Window.ClosingBehavior.FIRE_ALWAYS)
.discardingFiredPanes())
.apply(Distinct.create())
.apply(FileIO.<GenericRecord>write()
.via(ParquetIO.sink(getOutput_schema()))
.to(outputPath.isEmpty() ? outputPath() :
outputPath)
.withNumShards(1)
.withNaming(new
CustomFileNaming("snappy.parquet")));
Flink 用户界面截图。它显示记录即将到来,直到 FileIO.Write。
这是不发送任何记录的阶段-
FileIO.Write/WriteFiles/WriteShardedBundlesToTempFiles/GroupIntoShards ->
FileIO.Write/WriteFiles/WriteShardedBundlesToTempFiles/WriteShardsIntoTempFiles/ParMultiDo(WriteShardsIntoTempFiles)
-> FileIO.Write/WriteFiles/GatherTempFileResults/Add void
key/AddKeys/Map/ParMultiDo(Anonymous)
知道这里可能出了什么问题或 Beam/Flink 中有任何未解决的错误吗?
【问题讨论】:
标签: apache-flink apache-beam apache-beam-io