【问题标题】:Spring Cloud Stream Kafka Binder Configuration update at runtimeSpring Cloud Stream Kafka Binder 配置在运行时更新
【发布时间】:2021-08-04 23:06:27
【问题描述】:

我正在使用 Spring 云流和 Kafka 绑定器来使用 SASL 连接到 Kafka 集群。 SASL 配置如下所示:

spring.cloud.stream.kafka.binder.configuration.sasl.mechanism=SCRAM-SHA-512
spring.cloud.stream.kafka.binder.configuration.sasl.jaas.config= .... required username="..." password="..."
spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_SSL

我想以编程方式/在运行时更新用户名和密码,如何在 Spring Cloud Stream 中使用 Spring Kafka binders 进行更新?

旁注: 使用BinderFactory,我可以参考KafkaMessageChannelBinder,它有KafkaBinderConfigurationProperties,在其configuration hashmap 中我可以看到这些配置,但我想知道如何在运行时更新配置,以便这些更改反映在也有连接?

@Autowired
BinderFactory binderFactory

....

public void foo()
{
    KafkaMessageChannelBinder k = (KafkaMessageChannelBinder)binderFactory.getBinder(null, MessageChannel.class);
    // Using debugger I inspected k.configurationProperties.configuration which has the SASL properties I need to update
}

【问题讨论】:

    标签: java spring apache-kafka spring-kafka spring-cloud-stream


    【解决方案1】:

    可以使用配置提供 jaas 用户名和密码,这也意味着可以在运行时使用相同的属性覆盖它们。

    这里是一个例子:https://github.com/spring-cloud/spring-cloud-stream-samples/blob/master/multi-binder-samples/kafka-multi-binder-jaas/src/main/resources/application.yml#L26

    在运行时,您可以覆盖在application.properties 中设置的值。例如,如果您使用java -jar 运行应用程序,您可以简单地将属性与它一起传递:spring.cloud.stream.kafka.binder.jaas.options.username。然后这个新值将在应用程序运行期间生效。

    【讨论】:

    • 如何覆盖这些属性?我是否只是覆盖 application.properties 文件?还是我要更改一些配置对象?
    • 我更新了答案以显示如何覆盖属性。
    • 我不想重新启动应用程序,我想在运行时更新它,这意味着如果我使用 API 调用获取凭据,那么我想简单地使用这些更新我的 Kafka 连接新凭据
    • 这可能行不通(至少在 Kafka 的当前工作方式下)。请与更广泛的 Kafka 社区联系,看看是否可以为 Kafka 客户端应用程序这样做。如果可能的话,我们也可以通过 Spring Cloud Stream 实现它。
    【解决方案2】:

    我昨天遇到了这个问题,花了大约 3-4 个小时来弄清楚如何使用 Spring Kafka 活页夹以编程方式更新 Spring Cloud Stream 中的用户名和密码,因为不能/不应该在 Git 中存储密码。(Spring引导版本 2.5.2) 覆盖 bean KafkaBinderConfigurationProperties 有效。

    @Bean
    @Primary
    public KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties(KafkaBinderConfigurationProperties properties) {
        String saslJaasConfigString = "org.apache.kafka.common.security.scram.ScramLoginModule required username=${USERNAME_FROM_EXTERNAL_SYSTEM_LIKE_VAULT} password=${PASSWORD_FROM_EXTERNAL_SYSTEM_LIKE_VAULT}"
        Map<String, String> configMap = properties.getConfiguration();
        configMap.put(SaslConfigs.SASL_JAAS_CONFIG, saslJaasConfigString);
        return properties;
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-12-16
      • 2021-02-17
      • 1970-01-01
      • 2018-06-20
      • 2020-03-05
      • 2020-03-23
      相关资源
      最近更新 更多