【问题标题】:Kafka RecordFilterStrategy does not filter records when using spring-kafka ReplyingKafkaTemplateKafka RecordFilterStrategy 在使用 spring-kafka ReplyingKafkaTemplate 时不过滤记录
【发布时间】:2020-07-13 22:45:54
【问题描述】:

您好,我对 ReplyingKafkaTemplate 有以下配置,我想根据相关性 ID 在消费者之前过滤消息,但由于某种原因,它没有过滤,任何人都可以提出问题所在。

@Bean
public ConcurrentMessageListenerContainer<String, FireflyResponse> replyContainer() {
    ConcurrentKafkaListenerContainerFactory<String, FireflyResponse> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    RetryTemplate retryTemplate = new RetryTemplate();
    retryTemplate.setRetryPolicy(new SimpleRetryPolicy(retry));
    factory.setRetryTemplate(retryTemplate);
    factory.setConcurrency(3);
    factory.setBatchListener(true);
    factory.setAckDiscarded(true);
     factory.setRecordFilterStrategy(new RecordFilterStrategy<String, FireflyResponse>() {
        @Override
        public boolean filter(ConsumerRecord<String, FireflyResponse> consumerRecord) {
            return consumerRecord.headers().lastHeader(KafkaHeaders.CORRELATION_ID) == null;
        }
    });
    return factory.createContainer(responseTopic);
}

@Bean
public ReplyingKafkaTemplate<String, FireflyRequest, FireflyResponse> kafkaTemplate(
    ConcurrentMessageListenerContainer<String, FireflyResponse> replyContainer) {
    ReplyingKafkaTemplate<String, FireflyRequest, FireflyResponse> template = new ReplyingKafkaTemplate<>(
        producerFactory(), replyContainer);
    template.setDefaultReplyTimeout(Duration.ofSeconds(connectionTimeout));
    template.setSharedReplyTopic(true);
    return template;
}

【问题讨论】:

    标签: apache-kafka spring-kafka


    【解决方案1】:

    回复模板总是设置相关 id 标头...

    @Override
    public RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record, @Nullable Duration replyTimeout) {
        Assert.state(this.running, "Template has not been start()ed"); // NOSONAR (sync)
        CorrelationKey correlationId = this.correlationStrategy.apply(record);
        Assert.notNull(correlationId, "the created 'correlationId' cannot be null");
        ...
    

    它需要将回复与请求关联起来。

    编辑

    您似乎正在尝试过滤响应;不支持;仅过滤请求。

    如果您不想回复,只需从侦听器返回null

    【讨论】:

    • 我尝试使用其他属性进行过滤,但仍然没有调用 RecordFilterStrategy
    • 哦;我刚刚注意到您正在尝试过滤回复;不支持;仅过滤请求。如果您不想回复,只需从侦听器返回null
    猜你喜欢
    • 1970-01-01
    • 2023-04-08
    • 2022-12-07
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-09-09
    • 2020-05-27
    • 2018-05-03
    相关资源
    最近更新 更多