【问题标题】:No current assignment for partition occurs even after poll in Kafka即使在 Kafka 中进行轮询后,也不会发生当前的分区分配
【发布时间】:2019-06-26 02:27:34
【问题描述】:

我有使用 Apache Kafka 2.11-0.10.1.0 的 Java 8 应用程序。我需要使用seek 功能来处理来自分区的poll 旧消息。但是我遇到了No current assignment for partition 的异常,每次我尝试seekByOffset 时都会发生这种情况。这是我的班级,负责将主题seeking 到指定的时间戳:

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;

import java.time.Instant;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

/**
 * The main purpose of this class is to move fetching point for each partition of the {@link KafkaConsumer}
 * to some offset which is determined either by timestamp or by offset number.
 */
public class KafkaSeeker {
    public static final long APP_STARTUP_TIME = Instant.now().toEpochMilli();

    private final Logger LOGGER = LoggerFactory.getLogger(this.getClass());
    private final KafkaConsumer<String, String> kafkaConsumer;
    private ConsumerRecords<String, String> polledRecords;

    public KafkaSeeker(KafkaConsumer<String, String> kafkaConsumer) {
        this.kafkaConsumer = kafkaConsumer;
        this.polledRecords = new ConsumerRecords<>(Collections.emptyMap());
    }

    /**
     * For each assigned or subscribed topic {@link org.apache.kafka.clients.consumer.KafkaConsumer#seek(TopicPartition, long)}
     * fetching pointer to the specified {@code timestamp}.
     * If no messages were found in each partition for a topic,
     * then {@link org.apache.kafka.clients.consumer.KafkaConsumer#seekToEnd(Collection)} will be called.
     *
     * Due to {@link KafkaConsumer#subscribe(Pattern)} and {@link KafkaConsumer#assign(Collection)} laziness
     * method needs to execute dummy {@link KafkaConsumer#poll(long)} method. All {@link ConsumerRecords} which were
     * polled from buffer are swallowed and produce warning logs.
     *
     * @param timestamp is used to find proper offset to seek to
     * @param topics are used to seek only specific topics. If not specified or empty, all subscribed topics are used.
     */
    public Map<TopicPartition, OffsetAndTimestamp> seek(long timestamp, Collection<String> topics) {
        this.polledRecords = kafkaConsumer.poll(0);
        Collection<TopicPartition> topicPartitions;
        if (CollectionUtils.isEmpty(topics)) {
            topicPartitions = kafkaConsumer.assignment();
        } else {
            topicPartitions = topics.stream()
                    .map(it -> {
                        List<Integer> partitions = kafkaConsumer.partitionsFor(it).stream()
                                .map(PartitionInfo::partition).collect(Collectors.toList());
                        return partitions.stream().map(partition -> new TopicPartition(it, partition));
                    })
                    .flatMap(it -> it)
                    .collect(Collectors.toList());
        }

        if (topicPartitions.isEmpty()) {
            throw new IllegalStateException("Kafka consumer doesn't have any subscribed topics.");
        }

        Map<TopicPartition, Long> timestampsByTopicPartitions = topicPartitions.stream()
                .collect(Collectors.toMap(Function.identity(), topicPartition -> timestamp));
        Map<TopicPartition, Long> beginningOffsets = kafkaConsumer.beginningOffsets(topicPartitions);
        Map<TopicPartition, OffsetAndTimestamp> offsets = kafkaConsumer.offsetsForTimes(timestampsByTopicPartitions);
        for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : offsets.entrySet()) {
            TopicPartition topicPartition = entry.getKey();
            if (entry.getValue() != null) {
                LOGGER.info("Kafka seek topic:partition [{}:{}] from [{} offset] to [{} offset].",
                        topicPartition.topic(),
                        topicPartition.partition(),
                        beginningOffsets.get(topicPartition),
                        entry.getValue());
                kafkaConsumer.seek(topicPartition, entry.getValue().offset());
            } else {
                LOGGER.info("Kafka seek topic:partition [{}:{}] from [{} offset] to the end of partition.",
                        topicPartition.topic(),
                        topicPartition.partition());
                kafkaConsumer.seekToEnd(Collections.singleton(topicPartition));
            }
        }
        return offsets;
    }

    public ConsumerRecords<String, String> getPolledRecords() {
        return polledRecords;
    }
}

在调用该方法之前,我已经让消费者订阅了一个像 consumer.subscribe(singletonList(kafkaTopic)); 这样的主题。当我得到kafkaConsumer.assignment() 时,它返回零分配的TopicPartitions。但是,如果我指定主题并获取其分区,那么我就有有效的TopicPartitions,尽管它们在seek 调用中失败,并且标题中有错误。我忘记了什么?

【问题讨论】:

    标签: java apache-kafka kafka-consumer-api


    【解决方案1】:

    可靠地寻找和检查当前分配的正确方法是在订阅后等待onPartitionsAssigned() 回调。在新创建的(仍未连接的)消费者上,调用 poll() 一次并不能保证它会立即连接并分配分区。

    作为一个基本示例,请参阅下面的代码,它订阅了一个主题,并在分配的回调中寻找所需的位置。最后,您会注意到轮询循环正确地只看到来自查找位置的记录,而不是来自先前提交或重置偏移量的记录。

    public static final Map<TopicPartition, Long> offsets = Map.of(new TopicPartition("testtopic", 0), 5L);
    
    public static void main(String args[]) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        try (Consumer<String, String> consumer = new KafkaConsumer<>(props)) {
    
            consumer.subscribe(Collections.singletonList("testtopic"), new ConsumerRebalanceListener() {
    
                @Override
                public void onPartitionsRevoked(Collection<TopicPartition> partitions) {}
    
                @Override
                public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                    System.out.println("Assigned " + partitions);
                    for (TopicPartition tp : partitions) {
                        OffsetAndMetadata oam = consumer.committed(tp);
                        if (oam != null) {
                            System.out.println("Current offset is " + oam.offset());
                        } else {
                            System.out.println("No committed offsets");
                        }
                        Long offset = offsets.get(tp);
                        if (offset != null) {
                            System.out.println("Seeking to " + offset);
                            consumer.seek(tp, offset);
                        }
                    }
                }
            });
    
            for (int i = 0; i < 10; i++) {
                System.out.println("Calling poll");
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100L));
                for (ConsumerRecord<String, String> r : records) {
                    System.out.println("record from " + r.topic() + "-" + r.partition() + " at offset " + r.offset());
                }
            }
        }
    }
    

    【讨论】:

    • 您的答案似乎是正确的,但是通过这种方法,我得到了java.lang.IllegalStateException: org.springframework.boot.web.servlet.context.AnnotationConfigServletWebServerApplicationContext@5b202a3a has been closed already 异常
    • 这个答案适用于香草卡夫卡应用程序。您关于 Spring 的 ApplicationContext 已关闭的错误是一个不相关的问题。
    • 互联网上有大量使用投票获取作业的示例。终于找到了这个答案。希望这个答案能浮在谷歌结果的顶部,可能会为很多人节省很多时间。
    • @Ryan 说得很好......我找到的几乎所有文档和所有参考资料都是完全垃圾。希望我早点偶然发现这个......
    • 我认为在提供的示例中,可能在执行 ConsumerRebalanceListener 并执行搜索之前完成了几个轮询调用。我想在执行消费之前应该有一些“跳过直到搜索完成”机制。也许使用 CompletableFuture 或此类。
    【解决方案2】:
    KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
    // Get topic partitions
    List<TopicPartition> partitions = consumer
                        .partitionsFor(topic)
                        .stream()
                        .map(partitionInfo ->
                                new TopicPartition(topic, partitionInfo.partition()))
                        .collect(Collectors.toList());
    // Explicitly assign the partitions to our consumer
    consumer.assign(partitions);
    //seek, query offsets, or poll
    

    请注意,这会禁用消费者组管理和重新平衡操作。尽可能使用@Mickael Maison 的方法。

    【讨论】:

      猜你喜欢
      • 2019-08-14
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-07-29
      • 1970-01-01
      • 2017-06-28
      • 2017-07-09
      相关资源
      最近更新 更多