【问题标题】:Apache Kafka with High Level Consumer: Skip corrupted messages具有高级消费者的 Apache Kafka:跳过损坏的消息
【发布时间】:2015-12-30 11:59:21
【问题描述】:

我遇到了高级 kafka 消费者 (0.8.2.0) 的问题 - 在消耗了一些数据后,我们的一个消费者停止了。重新启动后,它会消耗一些消息并再次停止,没有错误/异常或警告。

经过一番调查,我发现消费者的问题是这个异常:

ERROR c.u.u.e.impl.kafka.KafkaConsumer  - Error consuming message stream:
 kafka.message.InvalidMessageException: Message is corrupt (stored crc = 3801080313, computed crc = 2728178222)

有什么想法可以简单地跳过这些消息吗?

【问题讨论】:

  • 我认为没有办法跳过损坏的消息。找到了一个讨论 here 可能值得一看

标签: apache-kafka kafka-consumer-api


【解决方案1】:

所以,回答我自己的问题。在对 Kafka Consumer 进行了一些调试后,我找到了一种可能的解决方案:

  1. 创建kafka.consumer.ConsumerIterator 的子类
  2. 覆盖makeNext-方法。在这个方法中捕获 InvalidMessageException 并返回一些虚拟占位符。
  3. 在您的while-loop 中,您必须将kafka.consumer.ConsumerIterator 转换为您的实现。不幸的是,kafka.consumer.ConsumerIterator 的所有字段都是私有的,因此您必须使用反射。

这是代码示例:

val skipIt = createKafkaSkippingIterator(ks.iterator())

while(skipIt.hasNext()) {
  val messageAndTopic = skipIt.next()

  if (messageNotCorrupt(messageAndTopic)) {
    consumeFn(messageAndTopic)
  }
}

messageNotCorrupt-方法只检查参数是否等于虚拟消息。

【讨论】:

    【解决方案2】:

    另一种解决方案,可能更简单,使用 Kafka 0.8.2 客户端。

    try {
      val m = it.next()
      //...
    } catch {
      case e: kafka.message.InvalidMessageException ⇒
        log.warn("Corrupted message. Skipping.", e)
        resetIteratorState(it)
    }
    
    //...
    
    def resetIteratorState(it: ConsumerIterator[Array[Byte], Array[Byte]]): Unit = {
      val method = classOf[IteratorTemplate[_]].getDeclaredMethod("resetState")
      method.setAccessible(true)
      method.invoke(it)
    }
    

    【讨论】:

      猜你喜欢
      • 2015-06-05
      • 1970-01-01
      • 2014-02-13
      • 2012-03-11
      • 2016-09-01
      • 2017-07-10
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多