【问题标题】:Kafka cosumer poll not returning recording for low timeoutKafka 消费者轮询未返回低超时记录
【发布时间】:2017-06-23 06:07:02
【问题描述】:

Kafka 消费者轮询 API 未将记录返回到低超时。 如果我在轮询中增加超时值,那么记录就会到来。 我无法理解这个逻辑。请帮助,按照代码:

public ConsumerRecords<String, Map<String, String>> subscribeToQueue(String topic, QueueListener q) {
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "com.intuit.eventcollection.queue.KafkaJsonDeserializer");
    props.put("group.id", "test");
    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", "1000");
    props.put("session.timeout.ms", "30000");
    props.put("auto.offset.reset", "earliest"); 

    // Figure out where to start processing messages from
    KafkaConsumer<String, Map<String, String>> kafkaConsumer = new KafkaConsumer<String, Map<String, String>>(
            props);
    kafkaConsumer.subscribe(Arrays.asList(topic));
    ConsumerRecords<String, Map<String, String>> records = null;
    // Start processing messages
    try {
        records = kafkaConsumer.poll(100);

【问题讨论】:

    标签: apache-kafka consumer


    【解决方案1】:

    如果在指定为轮询超时(超时)的时间段内没有发布新的未消费消息,则轮询将不返回任何内容。

    【讨论】:

    • 我看到超时定义为:- timeout - 如果数据不可用,等待轮询的时间(以毫秒为单位)。如果为 0,则立即返回任何现在可用的记录。不得为负。在测试时,我不断在主题中发送数据,并且在同一流程中,我尝试使用超时为 100 的消息。但主题没有返回任何内容。但是当我将超时时间增加到 200 时。然后消息开始从主题中传出。至于您的回复“在该时间段内没有发布新的未消费消息”,我在测试时无法理解。
    猜你喜欢
    • 2017-09-22
    • 2022-10-15
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-11-15
    • 1970-01-01
    • 2019-05-11
    • 1970-01-01
    相关资源
    最近更新 更多