【问题标题】:How to write to different files based on content for batch processing in Flink?如何根据内容写入不同的文件以在 Flink 中进行批处理?
【发布时间】:2026-01-20 00:45:01
【问题描述】:

我正在尝试处理 HDFS 上的一些文件并将结果也写回 HDFS。在作业开始之前已经准备好文件。问题是我想根据文件内容写入不同的路径和文件。我知道 BucketingSink(doc here) 是为了在 Flink 流中实现这一点而提供的。但是,Dataset 似乎没有类似的 API。我在 * 上发现了一些问答。(123)。现在我想我有两个选择:

  1. 使用 Hadoop API:MultipleTextOutputFormatMultipleOutputs
  2. 以流的形式读取文件并使用BucketingSink

我的问题是如何在它们之间做出选择,还是有其他解决方案?任何帮助表示赞赏。

编辑:这个问题可能与this 重复。

【问题讨论】:

    标签: apache-flink flink-streaming


    【解决方案1】:

    我们遇到了同样的问题。我们也很惊讶DataSet 不支持addSink()

    我建议不要切换到流媒体模式。您可能会放弃一些在批处理模式下可用的优化(即内存池)。

    您可能必须实现自己的 OutputFormat 来进行分桶。

    相反,您可以扩展OutputFormat[YOUR_RECORD](或RichOutputFormat[]),您仍然可以使用BucketAssigner[YOUR_RECORD, String] 打开/写入/关闭输出流。

    这就是我们所做的,而且效果很好。

    我希望 flink 能尽快在 Batch Mode 中支持这一点。

    【讨论】:

    • 非常感谢。我会试一试的。
    最近更新 更多