【问题标题】:Apache Kafka (KStreams) : How to subscribe to multiple topics?Apache Kafka (KStreams):如何订阅多个主题?
【发布时间】:2019-10-26 02:11:48
【问题描述】:

我有以下代码

//Kafka Config setup
Properties props = ...; //setup

List<String> topicList = Arrays.asList({"A", "B", "C"});

StreamBuilder builder = new StreamBuilder();
KStream<String, String> source = builder.stream(topicList);

source
.map((k,v) -> { //busy code for mapping data})
.transformValues(new MyGenericTransformer());
.to((k,v,r) -> {//busy code for topic routing});

new KafkaStream(builder.build(), properties).start();

我的问题:当我添加多个要订阅的主题(即上面的 A、B、C)时,Kstream 代码停止接收记录。

参考: https://kafka.apache.org/10/javadoc/org/apache/kafka/streams/StreamsBuilder.html

相关文档

public <K,V> KStream<K,V> stream(java.util.Collection<java.lang.String> topics)

"If multiple topics are specified there is no ordering guarantee for records from different topics."

我想要实现的目标: 让一个 Kstream(即上面的“源”)从多个主题消费/处理。

【问题讨论】:

  • 您在日志中看到什么了吗?你启用调试日志了吗?此外,主题需要共同划分。这意味着每个主题的分区数、键和分区算法应该相同。
  • 您的代码看起来正确。我会查看 log4j 输出——应用程序是否正确重新平衡(即分配的任务等)。您是否监控输入主题的延迟?

标签: java apache-kafka kafka-consumer-api apache-kafka-streams


【解决方案1】:

主题是否共享相同的密钥?

注意,指定的输入主题必须按key进行分区。如果 情况并非如此,重新分区是用户的责任 任何基于键的操作(如聚合或连接)之前的数据是 应用于返回的 KStream。

这可能是你的阻碍。

另一个可能的问题可能是使用的消费者组。

【讨论】:

  • 感谢您的帮助
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2017-06-20
  • 1970-01-01
  • 1970-01-01
  • 2017-05-16
  • 2021-10-14
  • 2014-01-21
  • 1970-01-01
相关资源
最近更新 更多