【发布时间】:2019-07-17 22:02:59
【问题描述】:
我的消费者绑定到匿名消费者组,而不是我指定的消费者组。
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost
defaultBrokerPort: 9092
zkNodes: localhost
defaultZkPort: 2181
bindings:
inEvent:
group: eventin
destination: event
outEvent:
group: eventout
destination: processevent
我的 Spring Boot 应用
@SpringBootApplication
@EnableBinding(EventStream.class)
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
@StreamListener(value = "inEvent")
public void getEvent(Event event){
System.out.println(event.name);
}
}
我的输入输出通道接口
public interface EventStream {
@Input("inEvent")
SubscribableChannel inEvent();
@Output("outEvent")
MessageChannel outEvent();
}
我的控制台日志--
:在 3.233 秒内启动 ConsumerApplication(JVM 运行 4.004) : [Consumer clientId=consumer-3, groupId=anonymous.0d0c87d6-ef39-4bfe-b475-4491c40caf6d] 发现 小组协调员 singh:9092 (id: 2147483647 rack: null) : [Consumer clientId=consumer-3, groupId=anonymous.0d0c87d6-ef39-4bfe-b475-4491c40caf6d] 撤销 先前分配的分区 [] :已撤销的分区:[] : [Consumer clientId=consumer-3, groupId=anonymous.0d0c87d6-ef39-4bfe-b475-4491c40caf6d] (重新)加入 团体 : [消费者clientId=consumer-3, groupId=anonymous.0d0c87d6-ef39-4bfe-b475-4491c40caf6d] 成功 与第一代加入群组 : [Consumer clientId=consumer-3, groupId=anonymous.0d0c87d6-ef39-4bfe-b475-4491c40caf6d] 新设置 分配的分区 [inEvent-0] : [Consumer clientId=consumer-3, groupId=anonymous.0d0c87d6-ef39-4bfe-b475-4491c40caf6d] 重置 分区 inEvent-0 到偏移量 2 的偏移量。 :分配的分区:[inEvent-0]
【问题讨论】:
标签: spring-boot spring-cloud kafka-consumer-api spring-cloud-stream spring-kafka