【发布时间】:2018-04-09 10:18:55
【问题描述】:
我有三个消费者,它们都有多个实例,它们都在消费同一个主题。我希望每个消费者消费一次主题。为此,我创建了一个消费者组。从我读到的内容来看,Kafka 应该足够聪明,可以选择一个服务实例来使用该主题,但这并没有发生,所有消费者的所有实例都在使用该主题
我认为这可能与三个消费者都具有相同的组名有关,所以我关闭了两个消费者,让一个消费者有两个实例,但是当消费者组时,我仍然看到两条记录进入数据库应该只选择一个实例插入数据库。
我在下面做错了什么或遗漏了什么吗?
Application.yml
spring:
cloud:
stream:
bindings:
input-data:
destination: publisheddata.t
group: publisheddata
kafka:
bindings:
input:
consumer:
autoCommitOffset: false
binder:
auto-create-topics: true
kafka:
mode: raw
spring:
cloud:
stream:
kafka:
binder:
brokers: kafka:9092
zk-nodes: kafka:2181
Channels.java
public interface Channels {
String INPUT_DATA = "input-data";
@Input(INPUT_DATA)
SubscribableChannel dataInput();
}
DataHandler.java
@EnableBinding(Channels.class)
@Configuration
public class DataMessageHandler {
@StreamListener(Channels.INPUT_DATA)
public void handle(Message<?> message) {
... handling message ...
}
【问题讨论】:
-
这个话题你有多少个分区?
-
主题上没有设置分区
标签: java apache-kafka kafka-consumer-api