【问题标题】:How to pass groupId value in @KafkaListener from database?如何从数据库中传递@KafkaListener 中的 groupId 值?
【发布时间】:2021-11-12 15:27:11
【问题描述】:

我想将我的 spring mvc 应用程序与 Kafka Server 连接以使用 kafka 消息。为此,我编写了 KafkaConsumer 类,如下所示。

@Service
public class KafkaConsumer {
    
    
    @KafkaListener(groupId = "my-group-id", topicPattern = "VID.*", containerFactory = SystemParameterConstants.KAFKA_LISTENER_CONTAINER_FACTORY)
    public void receivedMessage(@Payload String message) {
        logger.info("================ receivedMessage() ==================");
        logger.info("::: Message recieved from kafka ::: {}", message);
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            ...
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
    }
}

这里我有硬编码的组 ID “my-group-id”,但我想从数据库中读取这个 groupId,以便我可以为不同的环境使用不同的 groupId。

请提出解决方案。谢谢!

【问题讨论】:

  • 为什么要从数据库而不是应用程序配置文件中读取?

标签: apache-kafka spring-kafka


【解决方案1】:

查看它的 JavaDocs:

/**
 * Override the {@code group.id} property for the consumer factory with this value
 * for this listener only.
 * <p>SpEL {@code #{...}} and property place holders {@code ${...}} are supported.
 * @return the group id.
 * @since 1.3
 */
String groupId() default "";

如果你有一个 bean,它从 DB 中读取数据,那么你可以这样做:

groupId = "#{myDbBean.grouIdFromDb}"

另见文档:https://docs.spring.io/spring-kafka/docs/current/reference/html/#annotation-properties

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-08-15
    • 1970-01-01
    • 2019-09-04
    • 2022-01-16
    • 2018-04-21
    • 1970-01-01
    相关资源
    最近更新 更多