【问题标题】:Spring Actuator + Kafka Streams - Add kafka stream status to health check endpointSpring Actuator + Kafka Streams - 将 kafka 流状态添加到健康检查端点
【发布时间】:2020-01-20 06:27:28
【问题描述】:

我有一个使用 apache kafka-streams 的 Spring Boot 应用程序。我不使用弹簧云流。我添加了执行器健康检查端点。我在application.yml 中这样配置它:

management:
  health.db.enabled: false
  endpoints.web:
    base-path:
    path-mapping.health: /

当引发运行时异常并且我的流已停止,如日志所示,但运行状况检查状态为 UP。

2019-09-17 13:16:31.522 INFO 1 --- [ Thread-5] org.apache.kafka.streams.KafkaStreams : stream-client [lpp-model-stream-7e6e8fea-fcad-4033-92a4-5ede50de6e17] Streams client stopped completely

如何将 kafka 流状态绑定到健康检查端点?

我的 pom.xml:

  <dependencies>
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
            </dependency>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-streams</artifactId>
            </dependency>
            <dependency>
                <groupId>data-wizards</groupId>
                <artifactId>lpp-common-avro</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-actuator</artifactId>
            </dependency>
            <dependency>
                <groupId>io.confluent</groupId>
                <artifactId>kafka-streams-avro-serde</artifactId>
            </dependency>
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>com.google.code.gson</groupId>
                <artifactId>gson</artifactId>
            </dependency>
            <dependency>
                <groupId>io.vavr</groupId>
                <artifactId>vavr</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
        </dependencies>

链接到我创建流的代码:https://gist.github.com/solveretur/fc4fdd6c7663dc4d58fe72d48029f9c3

【问题讨论】:

  • 您是否尝试过实现自定义 HealthIndicator?,您可以覆盖使用 KafkaStreams.setExceptionHandler 来标记自定义 HealthIndicator 实现中的错误。
  • 这正是我想要做的 - 实现一个 HealthIndicator - 但是我在 Java / Spring / Kafka 方面不够精通,我想找到一些工作示例将其复制到我的项目
  • 我根据以下理解回答了这个问题。如果这不起作用,您能否提供一个可以处理的minimal reproducible 示例?

标签: spring spring-boot apache-kafka


【解决方案1】:

健康信息是从实现应用程序上下文中配置的 HealthIndicator 接口的所有 bean 收集的。

您可以创建一个自定义 HealthIndicator,您可以使用它来报告 Kafka Streams 错误。

如下创建您的 HealthIndicator 单例 bean

@Component
public class MyHealthIndicator implements HealthIndicator {
    private Exception caughtException = null;
    // Any other information you want to store.
    @Override
    public Health health() {

        if (caughtException == null) {
            return Health
                .up()
                .withDetail("status", "Kafka Streams Running")
                .build();
        }
        else {
            return Health
                .down()
                .withDetail("status", "Not Available")
                .withDetail("error", caughtException.getMessage())
                .build();
        }
    }
    public void setException(Exception caughtException) {
        this.caughtException = caughtException;
    }
}

然后你可以在你使用 Kafka Streams 的地方自动装配这个 bean,你可以如下设置异常。

public class MyApp {
    @Autowire
    private MyHealthIndicator healthIndicator; // You can also use constructor injection instead. 

    // Rest of the code


    public void init() {
        // Streams initialization code here
        streams.setUncaughtExceptionHandler((Thread thread, Throwable throwable) -> {
            healthIndicator.setException(new Exception(throwable));
        });
    }

}

我希望这会有所帮助,但如果没有,请提供一个 Minimal, Verifiable, Reproducible 示例,有人可以使用该示例

【讨论】:

    【解决方案2】:

    KafkaStreams 维护内存中 State,它可以映射到 Actuator 的健康状态。状态可能是以下之一:CREATEDERRORNOT_RUNNINGPENDING_SHUTDOWNREBALANCINGRUNNING - 它们是不言自明的。有关状态转换,请参阅文档https://kafka.apache.org/11/javadoc/org/apache/kafka/streams/KafkaStreams.State.html

    如果您正在寻找一个完整的示例,您可以使用以下示例并根据您的需要对其进行更新(例如,您可能不会将 CREATED 视为状态 UP)。确保在应用程序上下文中有一个 KafkaStreams 类型的 bean。

    //Note that class name prefix before `HealthIndicator` will be camel-cased
    //and used as a health component name, `kafkaStreams` here
    @Component
    public class KafkaStreamsHealthIndicator implements HealthIndicator {
    
        //if you have multiple instances, inject as Map<String, KafkaStreams>
        //Spring will map KafkaStreams instances by bean names present in context
        //so you can provide status details for each stream by name
        @Autowired
        private KafkaStreams kafkaStreams; 
    
        @Override
        public Health health() {
            State kafkaStreamsState = kafkaStreams.state();
    
            // CREATED, RUNNING or REBALANCING
            if (kafkaStreamsState == State.CREATED || kafkaStreamsState.isRunning()) {
                //set details if you need one
                return Health.up().build();
            }
    
            // ERROR, NOT_RUNNING, PENDING_SHUTDOWN, 
            return Health.down().withDetail("state", kafkaStreamsState.name()).build();
        }
    }
    

    然后健康端点将显示如下:

    {
        "status": "UP",
        "kafkaStreams": {
            "status": "DOWN",
            "details": {  //not included if "UP"
                "state": "NOT_RUNNING"
            }
        }
    }
    
    

    【讨论】:

    • 基本上KStream只用于定义topology,不包含任何运行时信息。实际处理仅在您创建KafkaStreams 的实例并调用其start() 方法时开始。所以无论如何你都需要它,请参阅这篇文章以获取一个很好的示例baeldung.com/java-kafka-streams。确保在 Spring 上下文中定义 KafkaStreamsbean。我应该使用用于 HealthCheck 的 KafkaStreams bean 示例更新答案吗?
    • 不,我现在明白了 - KafkaStreams 只是隐藏在 spring 上下文中的某个地方,我有一些 spring 依赖项 - 我会接受你的回答
    • 我刚刚发现您通过 spring-kafka 使用 kafka 流。在这种情况下,KafkaStreams bean 不会暴露给上下文,它由StreamsBuilderFactoryBean 在内部管理。希望您可以通过streamsBuilder.getKafkaStreams() 获得KafkaStreams 的实例;所以我建议在 java 配置 中定义 KafkaStreamsHealthIndicator 你已经有 streamsBuilder。
    • 我正在尝试采用此解决方案,但是当我想卷曲 kafka 健康指标时。 curl -v http://localhost:8080/actuator/health * Trying ::1... * TCP_NODELAY set * Connection failed * connect to ::1 port 8080 failed: Connection refused * Trying 127.0.0.1... * TCP_NODELAY set * Connection failed * connect to 127.0.0.1 port 8080 failed: Connection refused * Failed to connect to localhost port 8080: Connection refused * Closing connection 0 curl: (7) Failed to connect to localhost port 8080: Connection refused 有什么帮助吗?
    • 可能有几十个原因,KafkaStreams指标不太可能是原因,因为您正在查询应用程序健康状态的根源。尝试搜索类似的问题,例如stackoverflow.com/questions/35517713/…
    猜你喜欢
    • 1970-01-01
    • 2018-02-07
    • 1970-01-01
    • 2023-03-10
    • 2015-12-06
    • 2018-12-25
    • 2022-06-13
    • 2019-08-05
    • 1970-01-01
    相关资源
    最近更新 更多