这取决于你想对生产者端的异常做什么。
如果生产者抛出异常(例如由于网络故障或 kafka 代理已死亡),则流将默认死亡。并且使用 kafka-streams 1.1.0 版,您可以通过实现 ProductionExceptionHandler 来覆盖默认行为,如下所示:
public class CustomProductionExceptionHandler implements ProductionExceptionHandler {
@Override
public ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], byte[]> record,
final Exception exception) {
log.error("Kafka message marked as processed although it failed. Message: [{}], destination topic: [{}]", new String(record.value()), record.topic(), exception);
return ProductionExceptionHandlerResponse.CONTINUE;
}
@Override
public void configure(final Map<String, ?> configs) {
}
}
如果您不希望流因异常而死亡,则可以从句柄方法返回CONTINUE,如果您希望流停止(默认为FAIL),则返回FAIL。
你需要在流配置中指定这个类:
default.production.exception.handler=com.example.CustomProductionExceptionHandler
还要注意ProductionExceptionHandler只处理producer上的异常,在处理消息的流式方法mapValues(..)、filter(..)、branch(..)等时不会处理异常,你需要用try包装这些方法逻辑/catch 块(将你所有的方法逻辑放入 try 块中,以保证你将处理所有异常情况):
.filter((key, value) -> { try {..} catch (Exception e) {..} })
据我所知,我们不需要显式处理消费者端的异常,因为 kafka 流稍后会自动重试消费(因为在消息被消费和处理之前,偏移量不会改变);例如如果在一段时间内无法访问 kafka 代理,您将从 kafka 流中得到异常,并且当中断时,kafka 流将消耗所有消息。所以在这种情况下,我们只会有延迟,没有任何损坏/丢失。
使用setUncaughtExceptionHandler,您将无法像使用ProductionExceptionHandler 那样更改默认行为,使用它您只能记录错误或将消息发送到失败主题。
更新自kafka-streams2.8.0
由于kafka-streams2.8.0,您可以自动替换失败的流线程(由未捕获的异常引起)
使用KafkaStreams 方法void setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler eh); 和StreamThreadExceptionResponse.REPLACE_THREAD。更多详情请关注Kafka Streams Specific Uncaught Exception Handler
kafkaStreams.setUncaughtExceptionHandler(ex -> {
log.error("Kafka-Streams uncaught exception occurred. Stream will be replaced with new thread", ex);
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
});