【发布时间】:2019-10-26 02:11:48
【问题描述】:
我有以下代码
//Kafka Config setup
Properties props = ...; //setup
List<String> topicList = Arrays.asList({"A", "B", "C"});
StreamBuilder builder = new StreamBuilder();
KStream<String, String> source = builder.stream(topicList);
source
.map((k,v) -> { //busy code for mapping data})
.transformValues(new MyGenericTransformer());
.to((k,v,r) -> {//busy code for topic routing});
new KafkaStream(builder.build(), properties).start();
我的问题:当我添加多个要订阅的主题(即上面的 A、B、C)时,Kstream 代码停止接收记录。
参考: https://kafka.apache.org/10/javadoc/org/apache/kafka/streams/StreamsBuilder.html
相关文档
public <K,V> KStream<K,V> stream(java.util.Collection<java.lang.String> topics)
"If multiple topics are specified there is no ordering guarantee for records from different topics."
我想要实现的目标: 让一个 Kstream(即上面的“源”)从多个主题消费/处理。
【问题讨论】:
-
您在日志中看到什么了吗?你启用调试日志了吗?此外,主题需要共同划分。这意味着每个主题的分区数、键和分区算法应该相同。
-
您的代码看起来正确。我会查看 log4j 输出——应用程序是否正确重新平衡(即分配的任务等)。您是否监控输入主题的延迟?
标签: java apache-kafka kafka-consumer-api apache-kafka-streams