【发布时间】:2017-07-28 18:01:03
【问题描述】:
我有一个基本的流处理流程,看起来像
master topic -> my processing in a mapper/filter -> output topics
我想知道处理“坏消息”的最佳方法。这可能是我无法正确反序列化的消息之类的事情,或者处理/过滤逻辑可能以某种意想不到的方式失败(我没有外部依赖项,因此不应该出现此类暂时性错误)。
我正在考虑将我的所有处理/过滤代码包装在 try catch 中,如果引发异常,则路由到“错误主题”。然后我可以研究该消息并根据需要修改或修复我的代码,然后将其重播以掌握。如果我让任何异常传播,则流似乎会被阻塞,并且不会接收到更多消息。
- 这种方法是否被认为是最佳做法?
- 是否有方便的 Kafka 流方式来处理这个问题?我认为没有 DLQ 的概念...
- 有哪些替代方法可以阻止 Kafka 干扰“坏消息”?
- 有哪些替代错误处理方法?
为了完整起见,这里是我的代码(伪 ish):
class Document {
// Fields
}
class AnalysedDocument {
Document document;
String rawValue;
Exception exception;
Analysis analysis;
// All being well
AnalysedDocument(Document document, Analysis analysis) {...}
// Analysis failed
AnalysedDocument(Document document, Exception exception) {...}
// Deserialisation failed
AnalysedDocument(String rawValue, Exception exception) {...}
}
KStreamBuilder builder = new KStreamBuilder();
KStream<String, AnalysedPolecatDocument> analysedDocumentStream = builder
.stream(Serdes.String(), Serdes.String(), "master")
.mapValues(new ValueMapper<String, AnalysedDocument>() {
@Override
public AnalysedDocument apply(String rawValue) {
Document document;
try {
// Deserialise
document = ...
} catch (Exception e) {
return new AnalysedDocument(rawValue, exception);
}
try {
// Perform analysis
Analysis analysis = ...
return new AnalysedDocument(document, analysis);
} catch (Exception e) {
return new AnalysedDocument(document, exception);
}
}
});
// Branch based on whether analysis mapping failed to produce errorStream and successStream
errorStream.to(Serdes.String(), customPojoSerde(), "error");
successStream.to(Serdes.String(), customPojoSerde(), "analysed");
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
非常感谢任何帮助。
【问题讨论】:
-
1>
quarantine topic方法似乎有风险,因为糟糕的生产者可能会导致高开销,特别是如果该主题的多个消费者一直忙于将相同的格式错误的消息推送到该隔离主题 2> @987654324 @ 方法听起来更直观,并且可以使用 KStreamdoubled = input.flatMap( .. 验证 k和v的反序列化来最小化潜在的重新分区开销,并且具有必须反序列化的缺点(这次安全)再次密钥;因为密钥的(反序列化)成本远低于值的成本
标签: error-handling apache-kafka apache-kafka-streams