【问题标题】:Flink reduce shuffling overhead and memory consumptionFlink 减少 shuffle 开销和内存消耗
【发布时间】:2023-03-30 20:00:03
【问题描述】:

我的 Flink 工作经常与一个或另一个任务管理器一起 OOM。我有足够的内存和存储空间来完成我的工作(2 个 JobManagers/16 个 TaskManagers - 每个都有 15 核和 63GB RAM)。有时作业在抛出 OOM 之前运行 4 天,有时作业在 2 天内进入 OOM。但与前几天相比,流量是稳定的。

我收到了一条建议,建议不要通过流式管道中的对象,而是使用原语来减少洗牌开销和内存消耗。

我工作的 flink 工作是用 Java 编写的。可以说下面是我的管道

Kafka source
    deserialize (converted bytes to java object, the object contains String, int, long types)

    FirstKeyedWindow    (the above serialized java objects received here)
        reduce

    SecondKeyedWindow (the above reduced java objects received here)
        reduce

Kafka sink (above java objects are serialized into bytes and are produced to kafka)

我的问题是我应该考虑什么来减少开销和内存消耗? 用 char 数组替换 String 是否有助于减少开销?或者 我应该只处理整个管道中的字节吗? 如果我在 KeyedWindows 之间序列化对象,是否有助于减少开销?但是如果我必须读回字节,那么我需要反序列化,根据需要使用然后序列化它。它不会产生更多的序列化/反序列化开销吗?

感谢您的建议。注意,我说的是每天接收 10TB 的数据。

更新 1:

我看到的OOM异常如下:

org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:远程任务管理器“主机/主机:端口”意外关闭了连接。这可能表明远程任务管理器丢失了。

回答大卫安德森cmets下面: 使用的 Flink 版本是 v1.11 使用的状态后端是 RocksDB,基于文件系统。该作业的堆内存不足。来自 Kafka 源的每条消息的大小最大为 300 字节。 reduce 函数执行重复数据删除(删除同一组中的重复项),第二个 reduce 函数执行聚合(更新对象内的计数)。

更新 2:

经过深入探索,我发现 Flink 使用的是 Kyro 默认的序列化器,效率低下。我知道custom_serializers 可以帮助减少开销,如果我们定义一个而不是使用 Kyro 默认值。我现在正在尝试 google-protobuf 看看它是否表现更好。

而且,我也期待增加适合我的工作并行性的taskmanager.network.memory.fraction。尚未找到正确的计算来设置上述配置。

【问题讨论】:

  • 这可能取决于窗口大小。如果您正在使用时间窗口,那么您可能会在某个窗口收到更多您的 TaskManager 可以处理的字节。如果您想加快(反)序列化,使用 Avro 可能是值得的。
  • 请给我们更多信息。您使用的是哪个 state 后端,以及 Flink 的哪个版本?你得到什么错误,确切地说——你的堆用完了吗?只是为了确认:这与 DataStream API 有关吗?我怀疑这与您执行 ser/de 的方式有关,但这些对象有多大?
  • 还有一件事:窗口缩减函数——你要缩减成什么样的数据结构?
  • 用您解决的 cmets 更新了我的问题。谢谢。

标签: apache-kafka apache-flink


【解决方案1】:

在我尝试的方法对我有用之后,我在这里回答我自己的问题。我在 Grafana 中发现了与我的 Flink 工作相关的额外指标。其中两个指标是GC 时间和 GC 计数。我在 GC(垃圾收集)指标中看到了一些好的峰值。原因可能是,我在作业管道中创建了一些新的对象。考虑到我正在处理的 TB 数据和每天 200 亿条记录,这个对象的创建出了问题。我已经对其进行了优化,以尽可能多地重用对象,从而减少了内存消耗。

我已将 taskmanager.network.memory 增加到所需的值,默认设置为 1GB。

在我上面的问题中,我谈到了自定义序列化程序以减少网络开销。我尝试使用 Kyro 实现 protobuf 序列化程序,并且 protobuf 生成的类是 final。如果我必须更新对象,我必须创建新对象,这将在 GC 指标中产生峰值。所以,避免使用它。也许我可以进一步更改 protobuf 生成类以满足我的需要。如果情况不一致,将考虑该步骤。

【讨论】:

    猜你喜欢
    • 2013-06-13
    • 1970-01-01
    • 2023-03-18
    • 2015-05-10
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多