【问题标题】:spring cloud stream - consumer group boundspring cloud stream - 消费组绑定
【发布时间】:2019-07-17 22:02:59
【问题描述】:

我的消费者绑定到匿名消费者组,而不是我指定的消费者组。

spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost
          defaultBrokerPort: 9092
          zkNodes: localhost
          defaultZkPort: 2181
        bindings:
          inEvent:
            group: eventin
            destination: event
          outEvent:
            group: eventout
            destination: processevent

我的 Spring Boot 应用

@SpringBootApplication
@EnableBinding(EventStream.class)
public class ConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }
    @StreamListener(value = "inEvent")
    public void getEvent(Event event){
        System.out.println(event.name);
    }
}

我的输入输出通道接口

public interface EventStream {
    @Input("inEvent")
    SubscribableChannel inEvent();
    @Output("outEvent")
    MessageChannel outEvent();
}

我的控制台日志--

:在 3.233 秒内启动 ConsumerApplication(JVM 运行 4.004) : [Consumer clientId=consumer-3, groupId=anonymous.0d0c87d6-ef39-4bfe-b475-4491c40caf6d] 发现 小组协调员 singh:9092 (id: 2147483647 rack: null) : [Consumer clientId=consumer-3, groupId=anonymous.0d0c87d6-ef39-4bfe-b475-4491c40caf6d] 撤销 先前分配的分区 [] :已撤销的分区:[] : [Consumer clientId=consumer-3, groupId=anonymous.0d0c87d6-ef39-4bfe-b475-4491c40caf6d] (重新)加入 团体 : [消费者clientId=consumer-3, groupId=anonymous.0d0c87d6-ef39-4bfe-b475-4491c40caf6d] 成功 与第一代加入群组 : [Consumer clientId=consumer-3, groupId=anonymous.0d0c87d6-ef39-4bfe-b475-4491c40caf6d] 新设置 分配的分区 [inEvent-0] : [Consumer clientId=consumer-3, groupId=anonymous.0d0c87d6-ef39-4bfe-b475-4491c40caf6d] 重置 分区 inEvent-0 到偏移量 2 的偏移量。 :分配的分区:[inEvent-0]

【问题讨论】:

    标签: spring-boot spring-cloud kafka-consumer-api spring-cloud-stream spring-kafka


    【解决方案1】:

    group 属性不得位于 kafka 树中。 它必须是这样的:

    我的消费者绑定到匿名消费者组,而不是我指定的消费者组。

    spring:
      cloud:
        stream:
           bindings:
              inEvent:
                group: eventin
                destination: event
    

    在文档中查看更多信息:http://cloud.spring.io/spring-cloud-static/spring-cloud-stream/2.1.1.RELEASE/single/spring-cloud-stream.html#consumer-groups

    group 是一个公共属性,因此它与绑定器实现无关。 kafka 用于 Apache Kafka 特定属性,在其 binder 实现级别上公开。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2020-03-20
      • 2021-12-30
      • 2016-06-21
      • 2017-06-22
      • 1970-01-01
      • 1970-01-01
      • 2017-04-28
      • 1970-01-01
      相关资源
      最近更新 更多