【发布时间】:2020-06-26 00:05:34
【问题描述】:
使用 kafka 流处理器 api
场景:流处理器(使用 kafka 流处理器 api 实现)从源主题读取数据 并根据一些业务逻辑将数据写入目标主题。
代码:
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsProcessor");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dev_cluster.org:9092");
props.put(StreamsConfig.STATE_DIR_CONFIG, "streams-pipe");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
Topology topology = new Topology();
topology.addSource("mySource", "source_topic");
topology.addProcessor("StreamsProcessor",()->new StreamsProcessor(), "mySource");
topology.addSink("sink1","output_topic","StreamsProcessor");
topology.addSink("sink2","output_topic2","StreamsProcessor");
topology.addSink("sink3","output_topic3","StreamsProcessor");
KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();
--------------------------------------------------------------
public void init(ProcessorContext context)
{
this.context = context;
context.commit();
}
public void process(String key, String Value)
{
// In a loop send to sink1 sink2 or sink3
context.forward(key,Value,To.child("sink1"));
}
----------------------------------------------------------------
问题:
如果流处理器无法将消息发布到一个或多个目标 那么上面的主题是什么是重试机制的最佳方法 使用 kafka 流处理器 api 吗?
请分享代码 sn-ps/links/best practice 来处理失败场景。 谢谢。
【问题讨论】:
-
这取决于发生了什么类型的故障。失败的原因有很多:反序列化问题、处理事件期间、向目标 kafka 主题生成消息期间等。如果您的生产者 kafka 暂时不可用,Kafka Streams 提供使用
retries属性重试的能力,例如retries: 10。还请查看 Kafka Streams 中的错误处理:stackoverflow.com/a/51299739/2335775。
标签: apache-kafka apache-kafka-streams