【问题标题】:Embedded Kafka not showing consumer offset嵌入式 Kafka 未显示消费者偏移量
【发布时间】:2021-03-27 11:52:50
【问题描述】:

有一个嵌入式 kafka 实例作为测试的一部分运行。 我正在尝试验证是否已读取所有消息,但从 kafka 管理客户端得到的结果为空。

Map<TopicPartition, OffsetAndMetadata> partitionOffset = embeddedKafkaRule.getEmbeddedKafka().doWithAdminFunction(admin -> {
        try{
            return admin.listConsumerGroupOffsets(COUNTER_GROUP).partitionsToOffsetAndMetadata().get();
        }catch (Exception e){
            throw new RuntimeException(e);
        }
    });

地图总是空的。 我尝试设置 ack all 并设置 100 毫秒 autoOffsetCommit 等待看看这是否有任何区别,但没有运气。

 System.setProperty("spring.cloud.stream.kafka.streams.binder.brokers", embeddedKafkaRule.getEmbeddedKafka()
            .getBrokersAsString());
 System.setProperty("spring.cloud.stream.bindings.enricher-in-0.destination", COUNTER_TOPIC);
 System.setProperty("spring.cloud.stream.bindings.enricher-in-0.group", COUNTER_GROUP);
 System.setProperty("spring.cloud.stream.bindings.enricher-out-0.destination", ENRICHED_COUNTER_TOPIC);
 System.setProperty("spring.cloud.stream.kafka.bindings.enricher-in-0.consumer.ackEachRecord", "true");
 System.setProperty("spring.cloud.stream.kafka.bindings.enricher-in-0.autoCommitOffset", "true");       System.setProperty("spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms", "100");

【问题讨论】:

    标签: kafka-consumer-api spring-kafka spring-cloud-stream embedded-kafka


    【解决方案1】:

    您什么时候设置这些系统属性?您确定绑定使用的是嵌入式代理吗?

    这对我来说很好。

    @SpringBootApplication
    public class So65329718Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So65329718Application.class, args);
        }
    
        @Bean
        Consumer<String> consume() {
            return System.out::println;
        }
    
    }
    
    spring.cloud.stream.bindings.consume-in-0.group=theGroup
    
    @SpringBootTest
    @EmbeddedKafka(bootstrapServersProperty = "spring.kafka.bootstrap-servers")
    class So65329718ApplicationTests {
    
        @Autowired
        KafkaTemplate<byte[], byte[]> template;
    
        @Autowired
        EmbeddedKafkaBroker broker;
    
        @Test
        void test() throws InterruptedException {
            this.template.send("consume-in-0", "foo".getBytes());
            Thread.sleep(10_000);
            this.broker.doWithAdmin(admin -> {
                try {
                    System.out.println(admin.listConsumerGroupOffsets("theGroup").partitionsToOffsetAndMetadata()
                            .get(10, TimeUnit.SECONDS));
                }
                catch (InterruptedException | ExecutionException | TimeoutException e) {
                    e.printStackTrace();
                }
            });
        }
    
    }
    
    foo
    ...
    {consume-in-0-0=OffsetAndMetadata{offset=1, leaderEpoch=null, metadata=''}}
    

    (如果没有特定于 binder 的属性,则 binder 使用 spring.kafka.bootstrap-servers)。

    【讨论】:

    • 我确定它使用的是嵌入式实例,因为我可以查询消费者组并显示它们。在我的应用处理入站主题的消息后,我还可以看到输出主题的输出。我能看到它们唯一的大区别是使用嵌入式 Kafka 类规则,而不是注释和自动装配。
    • 那没有区别;其他事情正在发生。如果你能提供一个完整、简单的示例项目,我可以看看有什么问题。
    猜你喜欢
    • 1970-01-01
    • 2019-05-01
    • 2018-02-03
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-03-11
    相关资源
    最近更新 更多