【发布时间】:2020-07-13 22:45:54
【问题描述】:
您好,我对 ReplyingKafkaTemplate 有以下配置,我想根据相关性 ID 在消费者之前过滤消息,但由于某种原因,它没有过滤,任何人都可以提出问题所在。
@Bean
public ConcurrentMessageListenerContainer<String, FireflyResponse> replyContainer() {
ConcurrentKafkaListenerContainerFactory<String, FireflyResponse> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy(new SimpleRetryPolicy(retry));
factory.setRetryTemplate(retryTemplate);
factory.setConcurrency(3);
factory.setBatchListener(true);
factory.setAckDiscarded(true);
factory.setRecordFilterStrategy(new RecordFilterStrategy<String, FireflyResponse>() {
@Override
public boolean filter(ConsumerRecord<String, FireflyResponse> consumerRecord) {
return consumerRecord.headers().lastHeader(KafkaHeaders.CORRELATION_ID) == null;
}
});
return factory.createContainer(responseTopic);
}
@Bean
public ReplyingKafkaTemplate<String, FireflyRequest, FireflyResponse> kafkaTemplate(
ConcurrentMessageListenerContainer<String, FireflyResponse> replyContainer) {
ReplyingKafkaTemplate<String, FireflyRequest, FireflyResponse> template = new ReplyingKafkaTemplate<>(
producerFactory(), replyContainer);
template.setDefaultReplyTimeout(Duration.ofSeconds(connectionTimeout));
template.setSharedReplyTopic(true);
return template;
}
【问题讨论】: