【问题标题】:Spring Kafka filter not filtering consumer recordSpring Kafka过滤器不过滤消费者记录
【发布时间】:2023-04-08 23:29:01
【问题描述】:

我正在尝试根据 ConsumerRecord 中某个字段的内容在消费前过滤 ConsumerRecord 消息。

应用过滤器之前的示例消费者记录是(在值中查找 GP_ID):

 ConsumerRecord(topic = jdbc-project, partition = 0, offset = 0, CreateTime = 1551118248440, serialized key size = -1, serialized value size = 69, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"GP_ID": {"bytes": "@"}, "PROJECT_ID": {"bytes": "\u001E\u008C"}, "START_DATE": 1009843200000, "END_DATE": 1041292800000, "TITLE": "Project- FPH", "STATUS_CODE": "INACTIVE"})

KafkaRecordVO(projectId=7820, gpId=64)

当我在 kafkaListenerContainerFactory() 中设置如下记录过滤策略时:

@Bean
ConcurrentKafkaListenerContainerFactory<String, GenericRecord> kafkaListenerContainerFactoryProject() {
    ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setRecordFilterStrategy(new RecordFilterStrategy<String, GenericRecord>() {
        @Override
        public boolean filter(ConsumerRecord<String, GenericRecord> consumerRecord) {
            long gpId= KafkaRecordVO.convertByteBufferToLong(consumerRecord.value().get("GP_ID"));
            if(gpId == 10766L || gpId == 10823L || gpId == 10459L || gpId == 10649L)
                return false;
            else
                return true;
        }
    });
    factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
    return factory;
}

KafkaRecordVO.convertByteBufferToLong 正在将 bytebuffer 值转换为 long 值。

这是正确评估,并返回真/假。

但是,当它被 Kafka 监听器消费时,如下所示:

@KafkaListener(id = "project", topics = "jdbc-project", containerFactory = "kafkaListenerContainerFactoryProject")
public void consumeProject(ConsumerRecord<String, GenericRecord> record,Acknowledgment acknowledgment) {
    log.debug(record.toString());
    KafkaRecordVO recordVo = new KafkaRecordVO().projectId(record.value().get("PROJECT_ID"))
                                                .budgetYear(record.value().get("GP_ID"));
    log.debug(recordVo.toString());
}

这将返回删除我过滤的字段值的记录:“GP_ID”

这些是应用过滤器后生成的示例日志(在值中查找 GP_ID):

ConsumerRecord(topic = jdbc-project, partition = 0, offset = 171275, CreateTime = 1551118279371, serialized key size = -1, serialized value size = 181, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = { "GP_ID": {"bytes": ""}, "PROJECT_ID": {"bytes": "\u0005â^"}, "START_DATE": 1470009600000, "END_DATE": 1532995200000, "TITLE": "Project 2016 - 2016", "STATUS_CODE": "INACTIVE"})

KafkaRecordVO(projectId=385630, gpId=0)

我在我的 kafkaListener 中为这个主题在其他字段的日志中得到了这个:“GP_ID”:{“bytes”:“”}。如何不剥离价值?这里有什么问题?

更新:问题发生在我使用实用方法将字节转换为长整数时。它将位置设置为数组的末尾。这就是它返回空数组的原因:

public static Long convertByteBufferToLong(Object byteBuff) {
    //After adding below line, the issue got resolved
    ByteBuffer buf = ((ByteBuffer) byteBuff).duplicate();
    byte[] arr = new byte[buf.remaining()];
    buf.get(arr);
    BigInteger bi =new BigInteger(1,arr);
    return bi.longValue();
}

【问题讨论】:

    标签: kafka-consumer-api spring-kafka


    【解决方案1】:

    你的建议毫无意义;过滤器适配器有这个代码...

    @Override
    public void onMessage(ConsumerRecord<K, V> consumerRecord, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
        if (!filter(consumerRecord)) {
            switch (this.delegateType) {
                case ACKNOWLEDGING_CONSUMER_AWARE:
                    this.delegate.onMessage(consumerRecord, acknowledgment, consumer);
                    break;
                case ACKNOWLEDGING:
                    this.delegate.onMessage(consumerRecord, acknowledgment);
                    break;
                case CONSUMER_AWARE:
                    this.delegate.onMessage(consumerRecord, consumer);
                    break;
                case SIMPLE:
                    this.delegate.onMessage(consumerRecord);
            }
        }
        else {
            ackFilteredIfNecessary(acknowledgment);
        }
    }
    

    它不对记录进行任何操作。

    尝试在调试器中运行,看看是否可以看到发生了什么。

    【讨论】:

    • 是的,和api无关。实用程序方法存在问题。更新了我的问题。感谢您抽出宝贵时间查看问题。
    猜你喜欢
    • 1970-01-01
    • 2019-11-21
    • 2022-12-07
    • 1970-01-01
    • 2018-03-18
    • 1970-01-01
    • 2018-04-26
    • 1970-01-01
    • 2018-12-15
    相关资源
    最近更新 更多