【问题标题】:Heroku Apache Kafka Configuration with Spring Boot带有 Spring Boot 的 Heroku Apache Kafka 配置
【发布时间】:2019-12-06 17:32:10
【问题描述】:

我发现了很多与使用 spring boot 配置 apache kafka 相关的示例项目,我尝试了其中的一些,它在我的 Windows 上运行良好,但是当我尝试在 heroku 上运行它们时,它们给了我 SSL 连接错误,而在heroku上连接apache kafka。

这是我的生产者和消费者配置类 bean

@Bean
public ProducerFactory<String, String> producerFactory() {
    Map<String, Object> config = new HashMap<>();

    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "URL");
    config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

    return new DefaultKafkaProducerFactory<>(config);
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> config = new HashMap<>();

    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "URL");
    config.put(ConsumerConfig.GROUP_ID_CONFIG, "xyz");
    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    return new DefaultKafkaConsumerFactory<>(config);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
    factory.setConsumerFactory(consumerFactory());
    return factory;
}


@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
}

【问题讨论】:

  • 您没有在此处添加任何 SSL 设置。您添加到 Dropwizard 的任何设置也适用于 Spring 工厂。

标签: java spring-boot authentication heroku apache-kafka


【解决方案1】:

您需要按照Heroku Kafka documentation 中的说明配置信任库。

使用env-keystore 的示例可能如下所示:

EnvKeyStore envTrustStore = EnvKeyStore.createWithRandomPassword("KAFKA_TRUSTED_CERT");
EnvKeyStore envKeyStore = EnvKeyStore.createWithRandomPassword("KAFKA_CLIENT_CERT_KEY", "KAFKA_CLIENT_CERT");

File trustStore = envTrustStore.storeTemp();
File keyStore = envKeyStore.storeTemp();

properties.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, envTrustStore.type());
properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, trustStore.getAbsolutePath());
properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, envTrustStore.password());
properties.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, envKeyStore.type());
properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keyStore.getAbsolutePath());
properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, envKeyStore.password());

有关完整示例,请参阅this Github repo

【讨论】:

  • 是的,我已经有了这个存储库,但它使用了与 dropwizard 相关的依赖项。我需要弹簧靴。 DefaultKafkaProducerFactory 可以接受 Map 而不是 Properties Object。
【解决方案2】:

我将 Spring Boot 的依赖项升级到 2.2.1.RELEASE,将 apache kafka 升级到 2.3.3.RELEASE 并更新了我的配置类,如下所示,它在 heroku 上使用 apache kafka 成功配置

private Map<String, Object> buildDefaults() {
Map<String, Object> properties = new HashMap<>();
List<String> hostPorts = Lists.newArrayList();

for (String url : Splitter.on(",").split(checkNotNull(getenv("KAFKA_URL")))) { 
    try {
    URI uri = new URI(url);
    hostPorts.add(format("%s:%d", uri.getHost(), uri.getPort()));

    switch (uri.getScheme()) {
        case "kafka":
        properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT");
        break;
        case "kafka+ssl":
        properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");

        try {
            EnvKeyStore envTrustStore = EnvKeyStore.createWithRandomPassword("KAFKA_TRUSTED_CERT");
            EnvKeyStore envKeyStore = EnvKeyStore.createWithRandomPassword("KAFKA_CLIENT_CERT_KEY", "KAFKA_CLIENT_CERT");

            File trustStore = envTrustStore.storeTemp();
            File keyStore = envKeyStore.storeTemp();

            properties.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, envTrustStore.type());
            properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, trustStore.getAbsolutePath());
            properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, envTrustStore.password());
            properties.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, envKeyStore.type());
            properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keyStore.getAbsolutePath());
            properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, envKeyStore.password());
            properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
        } catch (Exception e) {
            throw new RuntimeException("There was a problem creating the Kafka key stores", e);
        }
        break;
        default:
        throw new IllegalArgumentException(format("unknown scheme; %s", uri.getScheme()));
    }
    } catch (URISyntaxException e) {
    throw new RuntimeException(e);
    }
}

properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, Joiner.on(",").join(hostPorts));
return properties;
}

@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> config = buildDefaults();

config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

return new DefaultKafkaProducerFactory<>(config);
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> config = buildDefaults();
config.put(ConsumerConfig.GROUP_ID_CONFIG, "xyz");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<>(config);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
return factory;
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}

}

【讨论】:

    【解决方案3】:

    Kafka 很有可能通过 TLS 进行 SASL (JAAS) 身份验证。请仔细检查并添加一些额外的配置属性。

    【讨论】:

    • 我检查了其他配置属性,我找不到任何允许我添加 SSL 证书、SSL 密钥等的属性,我用 dropwizard 框架配置了 kafka,它配置和连接完美,但使用 spring boot ,我找不到与 SSL 配置相关的属性。
    • @MuhammadWaqar 你看到ssl.* 属性了吗? kafka.apache.org/documentation/#producerconfigs
    • 什么是 JASIG?卡夫卡不使用它,AFAIK
    • 哦,是的。我会纠正自己,我想到了 SASL 和相关的 JAAS 文件
    • @cricket_007,谢谢,我检查了给定链接的配置,我添加了这些属性,在 pom.xml 中升级了 kafka 和 spring boot 依赖项,它配置并连接成功。
    猜你喜欢
    • 2019-07-14
    • 1970-01-01
    • 1970-01-01
    • 2019-01-14
    • 1970-01-01
    • 2015-06-04
    • 2020-07-10
    • 1970-01-01
    • 2016-04-20
    相关资源
    最近更新 更多