【问题标题】:Handling failed messages using kafka streams processor api使用 kafka 流处理器 api 处理失败的消息
【发布时间】:2020-06-26 00:05:34
【问题描述】:

使用 kafka 流处理器 api

场景:流处理器(使用 kafka 流处理器 api 实现)从源主题读取数据 并根据一些业务逻辑将数据写入目标主题。

代码:

  Properties props = new Properties();
  props.put(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsProcessor");
  props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dev_cluster.org:9092");
  props.put(StreamsConfig.STATE_DIR_CONFIG, "streams-pipe");
  props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
  props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
  props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);

  Topology topology = new Topology();
  topology.addSource("mySource", "source_topic");
  topology.addProcessor("StreamsProcessor",()->new StreamsProcessor(), "mySource"); 
  topology.addSink("sink1","output_topic","StreamsProcessor");
  topology.addSink("sink2","output_topic2","StreamsProcessor");
  topology.addSink("sink3","output_topic3","StreamsProcessor");

  KafkaStreams streams = new KafkaStreams(topology, props);
  streams.start();
  --------------------------------------------------------------
  public void init(ProcessorContext context) 
  {
      this.context = context;
      context.commit();
  }

  public void process(String key, String Value) 
  {   
      // In a loop send to sink1 sink2 or sink3
      context.forward(key,Value,To.child("sink1"));
  }
  ----------------------------------------------------------------

问题:

如果流处理器无法将消息发布到一个或多个目标 那么上面的主题是什么是重试机制的最佳方法 使用 kafka 流处理器 api 吗?
请分享代码 sn-ps/links/best practice 来处理失败场景。 谢谢。

【问题讨论】:

  • 这取决于发生了什么类型的故障。失败的原因有很多:反序列化问题、处理事件期间、向目标 kafka 主题生成消息期间等。如果您的生产者 kafka 暂时不可用,Kafka Streams 提供使用retries 属性重试的能力,例如retries: 10。还请查看 Kafka Streams 中的错误处理:stackoverflow.com/a/51299739/2335775

标签: apache-kafka apache-kafka-streams


【解决方案1】:

您可以使用某种 kafkaProducer 作为 messageFailureHandler 并使用它您可以将所有失败的消息发送到专用的 kafka 主题。

如果你熟悉 kafka-connect 中的死信队列的概念,它是一样的(除了在 kafka-connect 中它只是一个配置问题)。

【讨论】:

  • 感谢 Yoni,但我正在寻找与 kafka 流处理器 api 有关的解决方案。也请分享一些链接、sn-ps、doc等。
  • 例如,这可以让您真正了解如何实现 kafka 生产者dzone.com/articles/take-a-deep-dive-into-kafka-producer-api ...如果您希望 kstreams 解决方案具有获取所有失败消息的主题,我认为这可能是一个不错的解决方案。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2020-12-18
  • 2017-07-28
  • 2019-08-04
  • 1970-01-01
  • 2019-06-21
  • 2016-07-02
  • 1970-01-01
相关资源
最近更新 更多