【问题标题】:Spring-kafka and kafka 0.10Spring-kafka 和 kafka 0.10
【发布时间】:2017-09-07 16:19:00
【问题描述】:

我目前正在尝试使用 kafka 和 spring-kafka 来消费消息。

但我在为同一个主题执行多个消费者时遇到了麻烦,并且有几个问题:

1 - 我的消费者往往会在一段时间后断开连接并且无法重新连接

我的消费者会定期收到以下警告:

2017-09-06 15:32:35.054  INFO 5203 --- [nListener-0-C-1] f.b.poc.crawler.kafka.KafkaListener      : Consuming {"some-stuff": "yes"} from topic [job15]
2017-09-06 15:32:35.054  INFO 5203 --- [nListener-0-C-1] f.b.p.c.w.services.impl.CrawlingService  : Start of crawling
2017-09-06 15:32:35.054  INFO 5203 --- [nListener-0-C-1] f.b.p.c.w.services.impl.CrawlingService  : Url has already been treated ==> skipping
2017-09-06 15:32:35.054  WARN 5203 --- [nListener-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Auto-commit of offsets {job15-3=OffsetAndMetadata{offset=11547, metadata=''}, job15-2=OffsetAndMetadata{offset=15550, metadata=''}} failed for group group-3: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
2017-09-06 15:32:35.054  INFO 5203 --- [nListener-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Revoking previously assigned partitions [job15-3, job15-2] for group group-3
2017-09-06 15:32:35.054  INFO 5203 --- [nListener-0-C-1] s.k.l.ConcurrentMessageListenerContainer : partitions revoked:[job15-3, job15-2]
2017-09-06 15:32:35.054  INFO 5203 --- [nListener-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : (Re-)joining group group-3

这会导致消费者停止并等待几秒钟。

正如消息中提到的,我将消费者session.timeout.ms 增加到30000 之类的东西。我仍然收到消息。 正如您在提供的日志中看到的那样,在记录完成其处理后立即发生断开连接。 所以......在 30 多岁的闲散之前很多。

2- 两个消费者应用程序确实经常收到相同的消息

查看消费者的日志时,我发现他们倾向于处理相同的消息。我知道 Kafka 是 at-least-once,但我从没想过会遇到很多重复。 希望我使用 redis,但我可能误解了一些我需要做的调整/属性。

代码

注意:我将 ConcurrentMessageListenerContainerauto-commit=true 一起使用,但使用 1 个线程运行。我只是启动同一个应用程序的几个实例,因为消费者使用的服务不是线程安全的。

KafkaContext.java

@Slf4j
@Configuration
@EnableConfigurationProperties(value = KafkaConfig.class)
class KafkaContext {

    @Bean(destroyMethod = "stop")
    public ConcurrentMessageListenerContainer kafkaInListener(IKafkaListener listener, KafkaConfig config) {
        final ContainerProperties containerProperties =
                new ContainerProperties(config.getIn().getTopic());
        containerProperties.setMessageListener(listener);
        final DefaultKafkaConsumerFactory<Integer, String> defaultKafkaConsumerFactory =
                new DefaultKafkaConsumerFactory<>(consumerConfigs(config));

        final ConcurrentMessageListenerContainer messageListenerContainer =
                new ConcurrentMessageListenerContainer<>(defaultKafkaConsumerFactory, containerProperties);

        messageListenerContainer.setConcurrency(config.getConcurrency());
        messageListenerContainer.setAutoStartup(false);
        return messageListenerContainer;
    }

    private Map<String, Object> consumerConfigs(KafkaConfig config) {
        final String kafkaHost = config.getHost() + ":" + config.getPort();
        log.info("Crawler_Worker connecting to kafka at {} with consumerGroup {}", kafkaHost, config.getIn().getGroupId());
        final Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHost);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, config.getIn().getGroupId());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JacksonNextSerializer.class);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30000);
        return props;
    }

}

监听器

@Slf4j
@Component
class KafkaListener implements IKafkaListener {

    private final ICrawlingService crawlingService;

    @Autowired
    public KafkaListener(ICrawlingService crawlingService) {
        this.crawlingService = crawlingService;
    }

    @Override
    public void onMessage(ConsumerRecord<Integer, Next> consumerRecord) {
        log.info("Consuming {} from topic [{}]", JSONObject.wrap(consumerRecord.value()), consumerRecord.topic());

        consumerService.apply(consumerRecord.value());
    }
}

【问题讨论】:

    标签: spring apache-kafka kafka-consumer-api


    【解决方案1】:

    这里的主要问题是您的消费者群体正在不断地重新平衡。增加session.timeout.ms 是对的,但我没有看到此配置应用于您的配置。尝试删除:

    props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30000);
    

    和设置:

    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10);
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
    

    您可以增加MAX_POLL_RECORDS_CONFIG 以获得更好的与经纪人沟通的性能。但是,如果您只在一个线程中处理消息,则将此值保持在较低水平会更安全。

    【讨论】:

    • 嗨,有时有人必须查看您的代码...谢谢您指出我选择了错误的参数。我今天会尝试并保持更新此问题!
    猜你喜欢
    • 2016-10-08
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-11-21
    • 2018-09-15
    • 2020-07-25
    • 1970-01-01
    • 2020-02-06
    相关资源
    最近更新 更多