【问题标题】:Apache Kafka with High Level Consumer: Skip corrupted messages具有高级消费者的 Apache Kafka:跳过损坏的消息
【发布时间】:2015-12-30 11:59:21
【问题描述】:
我遇到了高级 kafka 消费者 (0.8.2.0) 的问题 - 在消耗了一些数据后,我们的一个消费者停止了。重新启动后,它会消耗一些消息并再次停止,没有错误/异常或警告。
经过一番调查,我发现消费者的问题是这个异常:
ERROR c.u.u.e.impl.kafka.KafkaConsumer - Error consuming message stream:
kafka.message.InvalidMessageException: Message is corrupt (stored crc = 3801080313, computed crc = 2728178222)
有什么想法可以简单地跳过这些消息吗?
【问题讨论】:
-
我认为没有办法跳过损坏的消息。找到了一个讨论 here 可能值得一看
标签:
apache-kafka
kafka-consumer-api
【解决方案1】:
所以,回答我自己的问题。在对 Kafka Consumer 进行了一些调试后,我找到了一种可能的解决方案:
- 创建
kafka.consumer.ConsumerIterator 的子类
- 覆盖
makeNext-方法。在这个方法中捕获 InvalidMessageException 并返回一些虚拟占位符。
- 在您的
while-loop 中,您必须将kafka.consumer.ConsumerIterator 转换为您的实现。不幸的是,kafka.consumer.ConsumerIterator 的所有字段都是私有的,因此您必须使用反射。
这是代码示例:
val skipIt = createKafkaSkippingIterator(ks.iterator())
while(skipIt.hasNext()) {
val messageAndTopic = skipIt.next()
if (messageNotCorrupt(messageAndTopic)) {
consumeFn(messageAndTopic)
}
}
messageNotCorrupt-方法只检查参数是否等于虚拟消息。
【解决方案2】:
另一种解决方案,可能更简单,使用 Kafka 0.8.2 客户端。
try {
val m = it.next()
//...
} catch {
case e: kafka.message.InvalidMessageException ⇒
log.warn("Corrupted message. Skipping.", e)
resetIteratorState(it)
}
//...
def resetIteratorState(it: ConsumerIterator[Array[Byte], Array[Byte]]): Unit = {
val method = classOf[IteratorTemplate[_]].getDeclaredMethod("resetState")
method.setAccessible(true)
method.invoke(it)
}