【问题标题】:OutOfMemoryError when restart my Kafka Streams appplication重新启动我的 Kafka Streams 应用程序时出现 OutOfMemoryError
【发布时间】:2019-09-05 12:33:04
【问题描述】:

我有一个 Kafka Streams 应用程序(Kafka Streams 2.1 + Kafka broker 2.0),它基于 TimeWindows 进行聚合,我使用抑制运算符来抑制结果的输出。

在我重新启动我的应用程序之前一切正常,它会将 KTABLE-SUPPRESS-STATE-STORE 的偏移量重置为 0 以恢复抑制状态,如预期的那样。但是每次重启它都会抛出一个OutOfMemoryError,我想可能是堆大小不够,所以我用了一个更大的Xmx/Xms,它可以工作一两次重启,然后OutOfMemoryError又回来了.现在Xmx现在大概有20G了,我觉得有点不对劲。

代码sn-p:

TimeWindows windows = TimeWindows.of(windowSize).until(retentionHours.toMillis()).grace(graceHours);

KTable<Windowed<String>, MyStatistics> kTable = groupedBySerialNumber
                .windowedBy(windows)
                .aggregate(MyStatistics::new,
                    (sn, resList, stats) -> stats.addResources(resList).updateSN(sn),
                    Materialized.with(Serdes.String(), ArchiveSerdes.resourceStatistics()))
                .suppress(Suppressed.untilTimeLimit(timeToWait, Suppressed.BufferConfig.maxBytes(bufferMaxBytes)));

我发现KTABLE-SUPPRESS-STATE-STORE中的记录键是1234567j�P之类的东西,不可读,但我猜它是通过结合SN和窗口生成的,我认为这会使KTABLE- SUPPRESS-STATE-STORE 冗余,因为每个 SN 的每个窗口都会有多个记录。

我有两个问题:

  1. 如果OutOfMemoryError表示堆大小是否小,如果是,如何限制速率,如果不是,是什么意思?
  2. KTABLE-SUPPRESS-STATE-STORE 的键是由哪个 API 定义的,我应该如何或应该如何控制它?

谢谢!

2019/4/16 编辑

错误堆栈跟踪是:

java.lang.OutOfMemoryError: Java heap space        
        at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
        at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)        
        at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)        
        at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112)        
        at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:381)
        at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:342)        
        at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:609)        
        at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:541)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:467)        
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:535)        
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1243)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1188)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1164)
        at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:88)
        at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:321)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:839)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)

【问题讨论】:

    标签: java apache-kafka apache-kafka-streams


    【解决方案1】:

    如果OutOfMemoryError表示堆大小是否小,如果是,如何限制速率,如果不是,是什么意思?

    是的,没有足够的堆来分配应用程序需要运行的所有内存。我们不经常看到这种情况,并且抑制运算符是新的,所以我对此表示怀疑,但请记住,基本上您的应用程序中的任何数据结构都可能负责。

    诊断内存压力的最佳方法是执行“堆转储”。这基本上将您的 JVM 的整个内存复制到一个文件中,以便您可以使用像 https://www.eclipse.org/mat/ 这样的程序来分析它的内容。这将是一个学习曲线,但我认为您会发现一些分析内存使用情况的工具通常非常方便。

    您可以随时触发堆转储(有多种方法可以实现,您必须研究最适合自己的方法)。但是我认为当出现内存不足错误时,您会想要利用 Java 的漂​​亮选项进行堆转储。这样,您更有可能积极识别罪魁祸首。请参阅 https://docs.oracle.com/javase/7/docs/webnotes/tsg/TSG-VM/html/clopts.html#gbzrr 或类似的 JVM。

    我可以推测堆转储的原因,但我担心我可能会误入歧途并浪费你的时间。获得转储结果后,我认为您应该继续在 Kafka 问题跟踪器中打开错误报告:https://issues.apache.org/jira/projects/KAFKA。然后,我们可以帮助找出如何解决该错误以让您再次运行,以及如何在未来的版本中修复它。

    实际上,我将提供一种推测...您可能会看到此错误的结果:https://github.com/apache/kafka/pull/6536 (https://issues.apache.org/jira/browse/KAFKA-7895)。如果您在删除抑制运算符时 OOME 消失了,您可能希望暂时将其排除在外。一旦我们合并修复,我会请求发布错误修复版本,您可以重试以查看问题是否已解决。

    KTABLE-SUPPRESS-STATE-STORE 的 key 是由哪个 API 定义的,我应该如何或应该如何控制它?

    幸运的是,这有一个更直接的答案。您正在查看的密钥是记录密钥的二进制打包版本和窗口的时间戳。此密钥是您使用windowBy 的结果。在 Java 中,您可以看到聚合的结果是 KTable&lt;Windowed&lt;String&gt;, ...&gt;,并且 Suppress 不会更改键或值类型。换句话说,您正在查看密钥的序列化版本 (Windowed&lt;String&gt;)。

    暂时搁置压制;假设您有两个序列号,“asdf”和“zxcv”。假设您的窗口大小是一小时。您的应用程序在一天中的每个小时内(独立地)为这些序列号中的 每个 分组事件。因此,从 9:00 到 10:00 的所有“asdf”记录都有一个聚合,并且从 9:00 到 10:00 的所有“zxcv”记录也有一个聚合。因此,窗口化 KTable 中的键总数为 key space x number of windows being retained

    Suppression 运算符对 KTable 中的键数没有影响。其目的是在指定的时间内(timeToWait)抑制对这些键的更新。例如,在没有抑制的情况下,如果您在 9:00 到 10:00 之间对“asdf”记录进行了 3 次更新,则窗口聚合每次都会为(asdf, 9:00) 发出更新的结果,因此对于其中的 3 个事件,您会看到 3结果更新出来了。 Suppress 运算符只会阻止这些结果更新,直到 timeToWait 通过,当它通过时,它只发出最近的更新。

    所以,在任何时候,抑制缓冲区中的key的数量都小于上游KTable中的key的总数。它只包含在最近timeToWait 时间内更新的密钥。

    这有帮助吗?

    【讨论】:

    • 非常感谢约翰,您的解释很有帮助!对于第一个问题,我在JVM中添加了HeapDumpOnOutOfMemoryError进一步调查,昨天我删除了SUPPRESS主题,现在一切正常,如果再次出现我会观察几天,我会向Kafka社区报告.对于第二个问题,还有一件事我不清楚,比如说,如果我有两个序列号“abc”和“efg”,窗口大小是1小时,那么SUPPRESS主题将有2 * 24条记录,甚至几天过去了,既然 SUPPRESS 主题是紧凑日志,对吧?
    • 还有一点,由于 SUPPRESS topic 中的 record key 是一个序列化的Window&lt;String&gt;,它会包含开始时间和结束时间,那么对于 9:00-10:00 的记录,它就是 key每天都保持不变?我猜每个键的开始时间和结束时间每天都不一样,你能帮忙校准一下吗?谢谢!
    • 没问题,松坤。抑制运算符在发出结果后立即向更改日志主题发出墓碑,因此该主题(如缓冲区)只有“活动”窗口键的非墓碑记录。如果您有 1 小时的窗口,有 10 分钟的宽限期(例如),并且您有这两个键,那么您在任何时候都只有 4 个非墓碑记录。 (因为任何时候都有两个实时窗口)。
    • 第二个问题,为了便于阅读,我写的很简单。序列化的 Windowed Key 包含密钥的字节,以及该窗口的唯一开始时间,以自 unix 纪元(1970 年 1 月 1 日)开始以来的毫秒数为单位。所以窗口开始时间实际上在几天之间是唯一的。
    • 感谢约翰的回复。看了这个issue,我想我也遇到了同样的问题,重启后SUPPRESS话题越来越大,等这个补丁版本发布后,我会用它来验证我的猜测。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2022-01-16
    • 2018-07-17
    • 1970-01-01
    • 2019-10-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多