【问题标题】:Kafka producer hangs if topic doesn't exist如果主题不存在,Kafka 生产者会挂起
【发布时间】:2019-08-08 13:08:23
【问题描述】:

我是 Kafka 的新手,正在尝试实现一个简单的生产者,将数据发送到一个主题。 如果主题不存在,我想作为例外处理。

private Producer<UUID, Object> producer = createProducer(); 

private static Producer createProducer() {
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
            "mybootstrapserveraddress");
    props.put(ProducerConfig.CLIENT_ID_CONFIG, "ADAPTER");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            UUIDSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            JsonSerializer.class);
    props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,
            1000);
    props.put(ProducerConfig.RETRIES_CONFIG,
            1);
    props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG,
            1000);
    return new KafkaProducer<>(props);
}

public void send(Event event, String topic){
    try {
        UUID key = UUID.randomUUID();
        producer.send(new ProducerRecord<>(topic, key , event), (rm, ex) -> {
            if (ex != null) {
                log.warn("Error sending message with key {}\n{}", new Object[]{key, ex.getMessage()});
            } else {
                log.info( "Partition for key-value {} is {}", new Object[]{key, rm.partition()});
            }
        });
    } catch (Exception e) {
        log.error("Failed to send message ",e);
    } finally {
        producer.flush();
    }
}

但是,如果主题不存在,则继续轮询消息。 ProducerConfig 的超时和重试将被忽略。

[kafka-producer-network-thread | ADAPTER] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=ADAPTER] Error while fetching metadata with correlation id 6 : {my-example-topic2=UNKNOWN_TOPIC_OR_PARTITION}
[kafka-producer-network-thread | ADAPTER] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=ADAPTER] Error while fetching metadata with correlation id 7 : {my-example-topic2=UNKNOWN_TOPIC_OR_PARTITION}
[kafka-producer-network-thread | ADAPTER] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=ADAPTER] Error while fetching metadata with correlation id 8 : {my-example-topic2=UNKNOWN_TOPIC_OR_PARTITION}

我不想通过 AdminClient 检查主题是否存在。 Kafka 文档https://kafka.apache.org/documentation/#producerconfigs 没有任何帮助。

有没有办法解决这个问题?

【问题讨论】:

    标签: java apache-kafka kafka-producer-api


    【解决方案1】:

    当主题不存在时,获取元数据的重试应该在 60 秒后结束,默认情况下会在最后引发超时异常。 与此相关的生产者配置参数是max.block.ms(默认为 60000)。 据我所知,在减少此超时或使用 AdminClient(这是您不想做的事情)之前没有办法获得反馈。

    【讨论】:

    • 如果这仍然存在,您有什么见解吗?我在这里遇到了完全相同的问题,但是在这里设置 max.block.ms 没有任何效果。
    【解决方案2】:

    如果发布消息有任何问题,Kafka 将抛出一个MetadataNotUpdated 异常(它阻塞在send 方法上)。超时可使用:max.block.ms 进行配置。但是,请确保未禁用自动创建主题。

    【讨论】:

    • 其实是org.apache.kafka.common.errors.TimeoutException,不是MetadataNotUpdated异常。
    • 该死的对不起我的错误。你是对的。很久没看到这个异常了。
    猜你喜欢
    • 2018-04-19
    • 1970-01-01
    • 2017-02-05
    • 2020-12-11
    • 1970-01-01
    • 2017-08-20
    • 2020-06-25
    • 2021-08-26
    • 2021-10-22
    相关资源
    最近更新 更多