【问题标题】:Flink + Kafka, java.lang.OutOfMemoryError when parallelism > 1Flink + Kafka, java.lang.OutOfMemoryError 当并行度 > 1
【发布时间】:2019-07-29 11:09:19
【问题描述】:

我有一个玩具 Flink 作业,它读取 3 个 kafka 主题,然后合并所有这 3 个流。就是这样,没有额外的工作。

如果在我的 Flink 作业中使用并行度 1,一切似乎都很好,只要我更改并行度 > 1,它就会失败:

java.lang.OutOfMemoryError: Direct buffer memory
    at java.nio.Bits.reserveMemory(Bits.java:693)
    at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
    at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
    at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241)
    at sun.nio.ch.IOUtil.read(IOUtil.java:195)
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
    at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:110)
    at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:97)
    at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
    at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:169)
    at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:150)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:355)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1047)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
    at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:257)

为什么它适用于并行度 1 而不是并行度 > 1?

是否与kafka服务器端设置有关?或者它与我的 java 代码中的消费者设置有关(我的代码中还没有特殊配置)?

我知道这里提供的信息可能还不够,但我无法触及 kafka 集群。我只是希望一些大师之前可能会遇到同样的错误,并且可以与我分享一些建议。

我使用的是 kafka 0.10,flink 1.5。

非常感谢。

【问题讨论】:

  • 如何设置并行度?env.setParallelism?

标签: apache-kafka apache-flink


【解决方案1】:

正如您在错误日志中看到的,此错误来自您的 Kafka 集群。当 Kafka 代理的直接缓冲区内存超过分配给 JVM 的堆大小时,会出现此问题。 Direct Buffer Memory 根据应用程序的需要从 JVM 的堆中分配。当你使用parallelism > 1时,多个Flink任务,min(Number of Flink Slots, Number of Kafka partitions)会同时消耗来自Kafka的数据,导致使用更多的Kafka brokers Heap size in比较并行度等于一的时候,就会发生所谓的错误。标准解决方案是通过将 KAFKA_HEAP_OPTS 变量添加到 Kafka env 文件 或作为 OS 环境变量 来增加 Kafka 代理可用的堆大小.例如,添加以下行将堆大小设置为 2 GB:

export KAFKA_HEAP_OPTS="-Xms2G -Xmx2G"

但是在您无法访问 Kafka 代理的情况下(根据您的问题),您可以减少在一次调用 poll() 时返回的记录数,因此代理中对堆内存的需求将是减少。 (这不是标准解决方案,我建议只是为了消除错误)。

来自this answer

Kafka Consumers通过以下两种方式处理数据积压 参数,

ma​​x.poll.interval.ms
poll() 调用之间的最大延迟 使用消费者组管理时。这设置了一个上限 消费者在获取更多信息之前可以空闲的时间量 记录。如果在此超时到期之前没有调用 poll(), 然后消费者被认为是失败的,该组将重新平衡 为了将分区重新分配给另一个成员。默认值为 300000.

ma​​x.poll.records
单次返回的最大记录数 调用 poll()。默认值为 500。

忽略按要求设置以上两个参数 可能导致轮询消费者可能没有的最大数据 能够处理可用资源,导致 OutOfMemory 或 有时无法提交消费者偏移量。因此,它总是 建议使用 max.poll.records 和 max.poll.interval.ms 参数。

所以对于测试,将 max.poll.records 的值减小到例如 250 并检查是否会发生错误。

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", BOOTSTRAPSERVERS);
properties.setProperty("group.id", ID);
properties.setProperty("key.deserializer", Serializer);
properties.setProperty("value.deserializer", Deserializer);
properties.setProperty("max.poll.records", "250");

FlinkKafkaConsumer08<String> myConsumer =
    new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties);

【讨论】:

  • 很好的回答,谢谢你的详细解释!我刚试了max.poll.records设置,再也看不到OOM了。还有一个问题,将其设置为小于默认值(500)的数字有什么副作用吗?
  • 很高兴看到它有帮助!除了减少在单个调用中返回的记录数会降低您的应用程序吞吐量之外,没有令人不安的副作用。这取决于您的应用程序需求。如果此设置满足您从 Kafka 读取数据并在 Flink 中处理它们的需求,那么这可能是您的最佳选择(但是您应该证明自己为什么使用分布式平台,也许顺序方法就足够了!),否则您应该自定义设置或使用更多强大的硬件。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-09-17
  • 1970-01-01
  • 2018-03-02
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多