【问题标题】:Handle failure in case the Kafka Broker is down在 Kafka Broker 宕机的情况下处理失败
【发布时间】:2018-05-11 11:33:34
【问题描述】:

我有一个正在运行的 Kafka 代理,消息被成功消费,但我想处理 Kafka 代理在 Kafka 消费者端关闭的情况。

我已阅读this 线程,但知道日志显示在调试级别。我想知道我是否可以在事件触发器上手动处理这个问题,可能是因为我想自己处理 Kafka 代理的失败。 Spring Kafka 是否提供了一些东西来处理这种情况?

如果需要更多详细信息,请告诉我。我将非常感谢任何可以为我指明正确方向的建议。谢谢

编辑 1:

正如@Artem 的回答,我已经在我的 KafkaConsumer 中尝试过这个

@EventListener
public void handleEvent(NonResponsiveConsumerEvent event) {
    LOGGER.info("*****************************************");
    LOGGER.info("Hello NonResponsiveConsumer {}", event);
    LOGGER.info("*****************************************");     
}

即使 Kafka 服务器正在运行(当我第一次启动应用程序时),此事件也会触发一次。请查看以下日志:

....
....
2017-12-04 13:08:02,177 INFO o.s.c.s.DefaultLifecycleProcessor - Starting beans in phase 0
2017-12-04 13:08:02,218 INFO o.a.k.c.c.ConsumerConfig - ConsumerConfig values: 
    auto.commit.interval.ms = 5000
    auto.offset.reset = latest
    bootstrap.servers = [52.214.67.60:9091]
    check.crcs = true
    client.id = 
    connections.max.idle.ms = 540000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = workerListener
    heartbeat.interval.ms = 3000
    interceptor.classes = null
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 305000
    retry.backoff.ms = 100
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.springframework.kafka.support.serializer.JsonDeserializer

2017-12-04 13:08:02,346 INFO o.a.k.c.u.AppInfoParser - Kafka version : 0.11.0.0
2017-12-04 13:08:02,346 INFO o.a.k.c.u.AppInfoParser - Kafka commitId : cb8625948210849f
2017-12-04 13:08:02,350 INFO o.s.s.c.ThreadPoolTaskScheduler - Initializing ExecutorService 
2017-12-04 13:08:02,363 INFO o.s.b.a.e.j.EndpointMBeanExporter - Located managed bean 'auditEventsEndpoint': registering with JMX server as MBean [org.springframework.boot:type=Endpoint,name=auditEventsEndpoint]
2017-12-04 13:08:02,397 INFO c.t.m.w.c.k.c.KafkaConsumer - *****************************************
2017-12-04 13:08:02,397 INFO c.t.m.w.c.k.c.KafkaConsumer - Hello NonResponsiveConsumer ListenerContainerIdleEvent [timeSinceLastPoll=1.51237491E9s, listenerId=workerListener-0, container=KafkaMessageListenerContainer [id=workerListener-0, clientIndex=-0, topicPartitions=null], topicPartitions=null]
2017-12-04 13:08:02,403 INFO c.t.m.w.c.k.c.KafkaConsumer - *****************************************
....
....

编辑 2:

通过将spring-kafka 升级到1.3.2 解决了问题

【问题讨论】:

    标签: java spring spring-boot apache-kafka spring-kafka


    【解决方案1】:

    1.3.1版本开始有:

    /**
     * An event that is emitted when a consumer is not responding to
     * the poll; a possible indication that the broker is down.
     *
     * @author Gary Russell
     * @since 1.3.1
     *
     */
    @SuppressWarnings("serial")
    public class NonResponsiveConsumerEvent extends KafkaEvent {
    

    并引用文档:

    另外,如果代理不可达(在撰写本文时),消费者poll() 方法不会退出,因此不会收到任何消息,也无法生成空闲事件。为解决此问题,如果轮询未在 pollInterval 属性的 3 倍内返回,容器将发布 NonResponsiveConsumerEvent。默认情况下,此检查在每个容器中每 30 秒执行一次。您可以在配置侦听器容器时通过在ContainerProperties 中设置monitorIntervalnoPollThreshold 属性来修改行为。接收到这样的事件将允许您停止容器,从而唤醒消费者,使其可以终止。

    【讨论】:

    • 您好,能否给我举个例子,说明当经纪人宕机时NonResponsiveConsumerEvent 的用法?
    • 不确定您的意思,但它是标准 Spring ApplicationEvent。你只需要一个ApplicationListener 来捕捉它并根据你的要求进行处理
    • 您的问题可能与此有关:github.com/spring-projects/spring-kafka/commit/…。考虑升级到1.3.2
    • 好。虽然我不确定答案中要更新什么,但我确定是时候接受答案了
    • 我刚刚注意到在打印event 类型为NonResponsiveConsumerEvent 的对象时,它显示了一个ListenerContainerIdleEvent 的对象,正如您可以看到上面的日志一样。我错过了什么吗?
    猜你喜欢
    • 2018-02-21
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-08-30
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多