【问题标题】:Kafka stream : Is there a way to ignore specific offsets in a topic partition while writing to another topicKafka流:有没有办法在写入另一个主题时忽略主题分区中的特定偏移量
【发布时间】:2018-10-25 15:45:33
【问题描述】:

背景:我在生产到 prod 主题时使用了错误的 avro 模式注册表,结果 kafka 连接由于模式 id 错误的消息而关闭。因此,作为恢复计划,我们希望将 prod 主题中的消息复制到一个测试主题,然后将好的消息写入 hdfs。但是在从 prod 主题读取时,我们面临某些偏移量有错误模式 id 的问题。有没有办法在写入另一个主题时忽略这些偏移量。

 Exception in thread "StreamThread-1" 
 org.apache.kafka.streams.errors.StreamsException: Failed to deserialize value 
 for record. topic=xxxx, partition=9, offset=1259032
  Caused by: org.apache.kafka.common.errors.SerializationException: Error 
  retrieving Avro schema for id 600
  Caused by: 

  io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: 
   Schema not found io.confluent.rest.exceptions.RestNotFoundException: Schema not found
  io.confluent.rest.exceptions.RestNotFoundException: Schema not found

{代码}

【问题讨论】:

    标签: apache-kafka apache-kafka-streams apache-kafka-connect


    【解决方案1】:

    您可以更改反序列化异常处理程序以跳过这些记录,如文档中所述:https://docs.confluent.io/current/streams/faq.html#handling-corrupted-records-and-deserialization-errors-poison-pill-records

    也就是说,你通过参数default.deserialization.exception.handler在配置中设置LogAndContinueExceptionHandler

    【讨论】:

      猜你喜欢
      • 2019-07-20
      • 1970-01-01
      • 2020-08-16
      • 2018-07-13
      • 2016-12-30
      • 1970-01-01
      • 2022-10-31
      • 1970-01-01
      • 2015-04-03
      相关资源
      最近更新 更多