【问题标题】:Spring-Kafka Consumer Group Coordination for ConcurrentKafkaListenerContainerFactoryConcurrentKafkaListenerContainerFactory 的 Spring-Kafka 消费者组协调
【发布时间】:2017-10-16 07:18:27
【问题描述】:

我有几个关于 spring-kafka 在某些情况下的行为的问题。任何答案或指示都会很棒。

背景:我正在构建一个 kafka 消费者,它与外部 API 对话并发送回确认。我的配置如下所示:

@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();

    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerServers());
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, this.configuration.getString("kafka-generic.consumer.group.id"));
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "5000000");
    props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "6000000");

    return props;
}


@Bean
public RetryTemplate retryTemplate() {
    final ExponentialRandomBackOffPolicy backOffPolicy = new ExponentialRandomBackOffPolicy();
    backOffPolicy.setInitialInterval(this.configuration.getLong("retry-exp-backoff-init-interval"));
    final SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
    retryPolicy.setMaxAttempts(this.configuration.getInt("retry-max-attempts"));
    final RetryTemplate retryTemplate = new RetryTemplate();
    retryTemplate.setBackOffPolicy(backOffPolicy);
    retryTemplate.setRetryPolicy(retryPolicy);
    return retryTemplate;
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Event> retryKafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Event> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());

    factory.setConcurrency(this.configuration.getInt("kafka-concurrency"));
    factory.setRetryTemplate(retryTemplate());
    factory.getContainerProperties().setIdleEventInterval(this.configuration.getLong("kafka-rtm-idle-time"));
    //factory.getContainerProperties().setAckOnError(false);
    factory.getContainerProperties().setErrorHandler(kafkaConsumerErrorHandler);
    factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
    return factory;
}

假设我拥有的分区数是 4。我的分区分布是针对 KafkaListener 的:

@KafkaListener(topicPartitions = @TopicPartition(topic = "topic", partitions = {"0", "1"}),
            containerFactory = "retryKafkaListenerContainerFactory")
public void receive(Event event, Acknowledgment acknowledgment) throws Exception {
    serviceInvoker.callService(event);
    acknowledgment.acknowledge();
}

@KafkaListener(topicPartitions = @TopicPartition(topic = "topic", partitions = {"2", "3"}),
        containerFactory = "retryKafkaListenerContainerFactory")
public void receive1(Event event, Acknowledgment acknowledgment) throws Exception {
    serviceInvoker.callService(event);
    acknowledgment.acknowledge();
}

现在我的问题是:

  1. 假设我有 2 台机器部署了此代码(具有相同的消费者组 ID)。如果我理解正确,如果我得到一个分区的事件,则其中一台机器的 kafkalistener 对应分区会监听,但其他机器的 kafkalistener 不会监听这个事件。是吗?

  2. 我的错误处理程序是:

  @Named
        public class KafkaConsumerErrorHandler implements ErrorHandler {
          @Inject
          private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

          @Override
          public void handle(Exception e, ConsumerRecord<?, ?> consumerRecord) {
              System.out.println("Shutting down all the containers");
              kafkaListenerEndpointRegistry.stop();
          }
        }

让我们谈谈一个场景,其中调用了消费者的 kafkalistener,它调用了 serviceInvoker.callService(event);,但服务已关闭,然后根据 retryKafkaListenerContainerFactory,它重试 3 次然后失败,然后调用 errorhandler 从而停止 kafkaListenerEndpointRegistry .这会关闭具有相同消费者组的所有其他消费者或机器,还是只关闭这个消费者或机器?

  1. 让我们谈谈 2 中的场景。是否需要更改任何配置以让 kafka 知道推迟这么长时间的确认?

  2. 我的 kafka 生产者每 10 分钟生成一次消息。我是否需要在我的消费者代码中的任何地方配置这 10 分钟,还是不知道这些?

  3. 在我的 KafkaListener 注释中,我硬编码了主题名称和分区。我可以在运行时更改它吗?

非常感谢任何帮助。提前致谢。 :)

【问题讨论】:

    标签: java spring apache-kafka kafka-consumer-api spring-kafka


    【解决方案1】:
    1. 正确;只有 1 人会得到它。
    2. 它只会停止本地容器 - Spring 对您的其他实例一无所知。
    3. 由于您有ackOnError=false,因此不会提交偏移量。
    4. 消费者不需要知道消息的发布频率。
    5. 您无法在运行时更改它们,但您可以使用属性占位符 ${...} 或 Spel 表达式 #{...} 在应用程序初始化期间进行设置。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2018-10-13
      • 2016-08-02
      • 2020-09-19
      • 2017-01-04
      • 2019-04-07
      • 2023-03-20
      • 2020-08-11
      • 1970-01-01
      相关资源
      最近更新 更多