【发布时间】:2021-02-18 04:54:30
【问题描述】:
以下是用于创建消费者的属性:
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put(KafkaAvroSerializerConfig.VALUE_SUBJECT_NAME_STRATEGY,
TopicRecordNameStrategy.class.getName());
props.put("schema.registry.url", schemaRegistryUrl);
props.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, true);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groipId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
"org.apache.kafka.clients.consumer.RoundRobinAssignor");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
> java.lang.OutOfMemoryError: Java heap space 2020-11-05 12:07:31,002
> ERROR [consumer-10-C-1] [] org.springframework.core.log.LogAccessor:
> Stopping container due to an Error java.lang.OutOfMemoryError: Java
> heap space 2020-11-05 12:07:30,999 ERROR [consumer-5-C-1] []
> org.springframework.core.log.LogAccessor: Stopping container due to an
> Error java.lang.OutOfMemoryError: Java heap space at
> java.base/java.nio.HeapByteBuffer.<init>(Unknown Source) at
> java.base/java.nio.ByteBuffer.allocate(Unknown Source) at
> org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)
> at
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:113)
> at
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:447)
> at
> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:397)
> at
> org.apache.kafka.common.network.Selector.attemptRead(Selector.java:678)
> at
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:580)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:485)
> at
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:550)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:236)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:469)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1274)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1173)
> at brave.kafka.clients.TracingConsumer.poll(TracingConsumer.java:89)
> at brave.kafka.clients.TracingConsumer.poll(TracingConsumer.java:83)
> at
> org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1109)
> at
> org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1065)
> at
> org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:990)
> at
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown
> Source) at java.base/java.util.concurrent.FutureTask.run(Unknown
> Source) at java.base/java.lang.Thread.run(Unknown Source)
>
>
> 2020-11-05 12:07:31,276 ERROR [consumer-5-C-1] []
> org.springframework.core.log.LogAccessor: Stopping container due to an
> Error java.lang.OutOfMemoryError: Direct buffer memory at
> java.base/java.nio.Bits.reserveMemory(Unknown Source) at
> java.base/java.nio.DirectByteBuffer.<init>(Unknown Source) at
> java.base/java.nio.ByteBuffer.allocateDirect(Unknown Source) at
> java.base/sun.nio.ch.Util.getTemporaryDirectBuffer(Unknown Source) at
> java.base/sun.nio.ch.IOUtil.read(Unknown Source) at
> java.base/sun.nio.ch.IOUtil.read(Unknown Source) at
> java.base/sun.nio.ch.SocketChannelImpl.read(Unknown Source) at
> org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:103)
> at
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:118)
> at
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:447)
> at
> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:397)
> at
> org.apache.kafka.common.network.Selector.attemptRead(Selector.java:678)
> at
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:580)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:485)
> at
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:550)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:236)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:469)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1274)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1173)
> at brave.kafka.clients.TracingConsumer.poll(TracingConsumer.java:89)
【问题讨论】:
-
仅仅显示属性并不能解决 OOM。您的消费者逻辑中的某些东西可能会导致这种情况。否则,您当前的堆大小是多少?您为增加它做了哪些努力?
-
所以我们有 2 个具有相同内存配置的环境,它在一个环境中运行良好,在另一个环境中面临问题。环境中唯一的区别是安全协议。在安全协议为 SSL 的情况下无法正常工作。
-
如果您没有启用 SSL,您应该收到 SSL 握手错误,而不是 OOM,但同样不清楚您的初始堆大小是多少
标签: java spring spring-boot apache-kafka