【问题标题】:Spring Integration Kafka Manual AcknowledgmentSpring Integration Kafka 手动致谢
【发布时间】:2015-09-16 14:12:59
【问题描述】:

我在使用 KafkaTopicOffsetManager 手动确认时遇到问题。当调用 acknowledge() 时,主题开始重复发送垃圾邮件。 Kafka 将 log.cleaner.enable 设置为 true,并且主题使用 cleanup.policy=compact。感谢您的帮助。

配置:

@Bean
public ZookeeperConfiguration zookeeperConfiguration() {
    ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration(kafkaConfig.getZookeeperAddress());
    zookeeperConfiguration.setClientId("clientId");
    return zookeeperConfiguration;
}

@Bean
public ConnectionFactory connectionFactory() {
    return new DefaultConnectionFactory(zookeeperConfiguration());
}

@Bean
public TestMessageHandler messageListener() {
    return new TestMessageHandler();
}

@Bean
public OffsetManager offsetManager() {
    ZookeeperConnect zookeeperConnect = new ZookeeperConnect(kafkaConfig.getZookeeperAddress());
    OffsetManager offsetManager = new KafkaTopicOffsetManager(zookeeperConnect, kafkaConfig.getTopic() + "_OFFSET");

    return offsetManager;
}

@Bean
public KafkaMessageListenerContainer kafkaMessageListenerContainer() {
    KafkaMessageListenerContainer kafkaMessageListenerContainer = new KafkaMessageListenerContainer(connectionFactory(), kafkaConfig.getTopic());
    kafkaMessageListenerContainer.setMessageListener(messageListener());
    kafkaMessageListenerContainer.setOffsetManager(offsetManager());
    return kafkaMessageListenerContainer;
}

听众:

public class TestMessageHandler implements AcknowledgingMessageListener {

    private static final Logger logger = LoggerFactory.getLogger(TestMessageHandler.class);

    @Override
    public void onMessage(KafkaMessage message, Acknowledgment acknowledgment) {
        logger.info(message.toString());

        acknowledgment.acknowledge();
    }
}

【问题讨论】:

  • KafkaTopicOffsetManager 需要自己的主题来维护正在消费的实际主题的偏移量。更新了正确的配置,以帮助其他人解决这个问题。

标签: spring-integration apache-kafka


【解决方案1】:

KafkaTopicOffsetManager 需要自己的主题来维护正在消费的实际主题的偏移量。

【讨论】:

    【解决方案2】:

    如果您不想自己处理解码消息负载(我认为这很痛苦),请从抽象类 AbstractDecodingAcknowledgeingMessageListener 扩展侦听器并提供 org.springframework.integration.kafka.serializer.common.StringDecoder 作为解码器。

    public class TestMessageHandlerDecoding extends AbstractDecodingAcknowledgingMessageListener {
    
    public TestMessageHandlerDecoding(Decoder keyDecoder, Decoder payloadDecoder) {
        super(keyDecoder, payloadDecoder);
    }
    
    @Override
    public void doOnMessage(Object key, Object payload, KafkaMessageMetadata metadata, Acknowledgment acknowledgment) {
        LOGGER.info("payload={}",payload);
    
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2020-07-25
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-07-30
      • 2020-08-05
      相关资源
      最近更新 更多