【发布时间】:2021-03-11 06:42:13
【问题描述】:
我试图让几个不同的 Spring Cloud 微服务都连接到 Kafka/Zookeeper 集群,都在 Kubernetes 中。微服务使用 org.springframework.kafka:spring-kafka - 作为事件的消费者和生产者。
所有服务都可以连接到 kafka - 并且创建了主题;但是每个服务的消费者非常不一致。
例如,当服务启动一次时,所有的消费者都会监听消息并调用函数。但是,当我重新启动所有内容(包括 kafka 和 zookeeper)时,它要么无法正常工作,要么不同服务中的一些消费者会工作等等......
这是我的一些配置——我没有任何基于 Java 的配置——就在我的 application.yml 中,如下所示:
spring:
....
kafka:
consumer:
bootstrap-servers: api-kafka.default.svc.cluster.local:9092
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
group-id: api-event
enable-auto-commit: false
producer:
bootstrap-servers: api-kafka.default.svc.cluster.local:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
ack-mode: manual
...
还有我的主要课程:
@EnableCaching
@SpringBootApplication
@EnableJpaRepositories
@EnableDiscoveryClient
@EnableKafka /* <<<<<<<------------- ENABLED HERE */
public class ExampleServiceApplication {
public static void main(String[] args) {
SpringApplication.run(ExampleServiceApplication.class, args);
}
.....
}
最后,我的消费者:
@Component
public class MessageListener {
@KafkaListener(
topics = "myTopic")
public void eventListener(String serializedMessage) {
try {
....
消息可以正常发送到代理,但不会被其他服务使用。
我意识到每个服务属性上都没有到主题的映射,我该如何通过 application.yml 做到这一点?
我敢打赌我犯了一个非常严重的错误,但是是的!非常感谢任何 cmets 或帮助
【问题讨论】:
-
你的 kafka 主题有多少个分区?如果您有一个分区,那么只有一项服务使用消息是正常的。如果希望更多服务并行消费主题消息,则需要增加分区数。
-
@OctavianR。哦,好吧 - 有趣 - 不知道!据我所知,我对所有服务只有一个分区,所以可能就是这样......我想我需要定义具有 NewTopic 类型的 Java bean 并使用正确数量的分区构建主题。感谢您的评论 - 我现在就试试
-
但是每个实体都有不同的主题,其中 EventTypes 作为字段等...-但这应该不会影响太大
-
不,应该不重要
标签: java spring-boot kubernetes apache-kafka spring-cloud