【发布时间】:2022-01-09 18:53:36
【问题描述】:
我有一个在 OpenShift 中运行的 pod。 pod 运行一个 Kafka 消费者,不断地轮询一个主题并在本地存储给定时间的记录。偶尔,该主题会获得大量新记录。由于存储记录所需的内存空间,这将导致 OOM 异常。但是,这很好,因为 pod 可以重新启动并再次使用。
不过,问题在于 pod 不会在 OOM 异常时重新启动。 pod 崩溃后,健康端点(服务器)仍然处于活动状态。结果,pod 不会重新启动,因为 OpenShift 仍然认为 pod 是健康的。从日志消息来看,shutdownHook 似乎从未运行过。
我的健康端点服务实现为
class HealthService : ILogging by Logging<HealthService>() {
@Get("/health")
fun health(): HttpResponse {
log.trace("I'm $responseText")
return HttpResponse.of(
statusCode,
MediaType.PLAIN_TEXT_UTF_8,
responseText
)
}
/**
* Should be called when the graceful shutdown process is completed. The service will now be
* considered dead by Kubernetes and the pod will be restarted.
*/
fun die() {
log.trace("Last breath...")
health.set(DEAD)
}
/** Thread-safe health state. */
private val health: AtomicInteger = AtomicInteger(ALIVE)
private val responseText
get() =
when (health.get()) {
ALIVE -> "alive"
SICK -> "sick"
else -> "dead"
}
private val statusCode
get() =
when (health.get()) {
DEAD -> HttpStatus.SERVICE_UNAVAILABLE
else -> HttpStatus.OK
}
companion object {
const val ALIVE = 0
const val SICK = 1
const val DEAD = 2
}
}
我的主要应用程序实现为
val log = Logger()
lateinit var healthService: HealthService
fun run() {
val consumer = createKafkaConsumer()
val server = buildServer(log)
val future = server.start()
future.join()
Runtime.getRuntime().addShutdownHook(
Thread {
log.info("Closing down...")
server.close()
healthService.die()
}
)
consumer.run()
}
private fun buildServer(log: Logger): Server {
log.info("Loading HTTP Endpoints on port ${config.port}...")
val sb = Server.builder().http(config.port).service(
"/"
) { _, _ -> HttpResponse.of("OK\n") }
healthService = HealthService()
sb.annotatedService(healthService)
return sb.build()
}
Kafka 消费者简单地实现为
class Consumer() {
val cache = Cache()
val name = "myConsumer"
fun run() {
try {
val pollDuration = config.kafka.pollDurationSeconds
while (true) {
val records = consumer.poll(Duration.ofSeconds(pollDuration))
addToCache(records)
}
} catch (e: Exception) {
log.error("Unexpected event happened. e=$e", e)
} finally {
log.info("Closing down $name consumer...")
consumer.close()
cache.close()
}
}
总而言之,consumer.run() 中抛出了 OOM 异常,导致程序崩溃。但是,运行状况端点会继续运行。因此,OpenShift 仍然认为程序/pod 和 pod 不会重启。
consumer.run() 中抛出 OOM 异常时如何终止运行状况端点?
编辑:添加 Kubernetes 配置
...
readiness:
path: /health
liveness:
path: /health
....
【问题讨论】:
标签: kotlin kubernetes apache-kafka openshift