【问题标题】:Kafka: Consumers adding multiple recordsKafka:消费者添加多条记录
【发布时间】:2018-04-09 10:18:55
【问题描述】:

我有三个消费者,它们都有多个实例,它们都在消费同一个主题。我希望每个消费者消费一次主题。为此,我创建了一个消费者组。从我读到的内容来看,Kafka 应该足够聪明,可以选择一个服务实例来使用该主题,但这并没有发生,所有消费者的所有实例都在使用该主题

我认为这可能与三个消费者都具有相同的组名有关,所以我关闭了两个消费者,让一个消费者有两个实例,但是当消费者组时,我仍然看到两条记录进入数据库应该只选择一个实例插入数据库。

我在下面做错了什么或遗漏了什么吗?

Application.yml

spring:
  cloud:
    stream:
      bindings:
        input-data:
          destination: publisheddata.t    
          group: publisheddata
      kafka:
    bindings:         
      input: 
        consumer:
           autoCommitOffset: false                
    binder:
      auto-create-topics: true
      kafka:
        mode: raw
spring:
    cloud:
      stream:
        kafka:
          binder:
            brokers: kafka:9092
            zk-nodes: kafka:2181

Channels.java

public interface Channels {

  String INPUT_DATA = "input-data";

  @Input(INPUT_DATA)
  SubscribableChannel dataInput();
}

DataHandler.java

@EnableBinding(Channels.class)
@Configuration
public class DataMessageHandler {

@StreamListener(Channels.INPUT_DATA)
public void handle(Message<?> message) {
  ... handling message ...
}

【问题讨论】:

  • 这个话题你有多少个分区?
  • 主题上没有设置分区

标签: java apache-kafka kafka-consumer-api


【解决方案1】:

以防将来有人遇到此问题,在研究了几天后,我发现问题出在我使用的 Spring Cloud 版本上,我使用的是Brixton.RELEASE,一旦我将其更新为Dalston.SR2解决了我的问题

【讨论】: