【发布时间】:2019-09-05 12:33:04
【问题描述】:
我有一个 Kafka Streams 应用程序(Kafka Streams 2.1 + Kafka broker 2.0),它基于 TimeWindows 进行聚合,我使用抑制运算符来抑制结果的输出。
在我重新启动我的应用程序之前一切正常,它会将 KTABLE-SUPPRESS-STATE-STORE 的偏移量重置为 0 以恢复抑制状态,如预期的那样。但是每次重启它都会抛出一个OutOfMemoryError,我想可能是堆大小不够,所以我用了一个更大的Xmx/Xms,它可以工作一两次重启,然后OutOfMemoryError又回来了.现在Xmx现在大概有20G了,我觉得有点不对劲。
代码sn-p:
TimeWindows windows = TimeWindows.of(windowSize).until(retentionHours.toMillis()).grace(graceHours);
KTable<Windowed<String>, MyStatistics> kTable = groupedBySerialNumber
.windowedBy(windows)
.aggregate(MyStatistics::new,
(sn, resList, stats) -> stats.addResources(resList).updateSN(sn),
Materialized.with(Serdes.String(), ArchiveSerdes.resourceStatistics()))
.suppress(Suppressed.untilTimeLimit(timeToWait, Suppressed.BufferConfig.maxBytes(bufferMaxBytes)));
我发现KTABLE-SUPPRESS-STATE-STORE中的记录键是1234567j�P之类的东西,不可读,但我猜它是通过结合SN和窗口生成的,我认为这会使KTABLE- SUPPRESS-STATE-STORE 冗余,因为每个 SN 的每个窗口都会有多个记录。
我有两个问题:
- 如果
OutOfMemoryError表示堆大小是否小,如果是,如何限制速率,如果不是,是什么意思? - KTABLE-SUPPRESS-STATE-STORE 的键是由哪个 API 定义的,我应该如何或应该如何控制它?
谢谢!
2019/4/16 编辑
错误堆栈跟踪是:
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:381)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:342)
at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:609)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:541)
at org.apache.kafka.common.network.Selector.poll(Selector.java:467)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:535)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1243)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1188)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1164)
at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:88)
at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:321)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:839)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)
【问题讨论】:
标签: java apache-kafka apache-kafka-streams