【发布时间】:2021-03-08 06:24:44
【问题描述】:
我正在尝试通过 Kafka 发送通常大小为 2mb 的 base64 编码字符串。我已将 Spring Kafka 生产者配置如下:
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 4194304);
configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 1);
configProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
return new DefaultKafkaProducerFactory<>(configProps);
}
我不断收到以下错误:
Caused by: org.apache.kafka.clients.producer.BufferExhaustedException: Failed to allocate memory within the configured max blocking time 60000 ms.
我尝试过的事情:
configProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 90000);
还有
configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 10);
还有
configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 0);
在自动创建主题时,连接到 kafka 代理。即使在尝试了上述修复的各种组合后,错误仍然存在。
【问题讨论】: