【发布时间】:2016-08-23 12:30:08
【问题描述】:
我在 AWS 上有一个简单的集群设置,有一个 kafka 实例和一个 zookeeper。我正在为此写<String, String> 并努力在 10 秒的窗口中聚合这些值。
我收到的错误消息:
DEBUG o.a.kafka.clients.NetworkClient - Sending metadata request {topics=[kafka_test1-write_aggregate-changelog]} to node 100
DEBUG org.apache.kafka.clients.Metadata - Updated cluster metadata version 6 to Cluster(nodes = [12.34.56.78:9092 (id: 100 rack: null)], partitions = [Partition(topic = kafka_test1-write_aggregate-changelog, partition = 1, leader = 100, replicas = [100,], isr = [100,], Partition(topic = kafka_test1-write_aggregate-changelog, partition = 0, leader = 100, replicas = [100,], isr = [100,]])
DEBUG o.a.k.c.consumer.internals.Fetcher - Attempt to fetch offsets for partition kafka_test1-write_aggregate-changelog-0 failed due to obsolete leadership information, retrying.
cluster metadata# 无限前进。
代码:
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> lines = kStreamBuilder.stream(TOPIC);
KTable<Windowed<String>, String> dbwriteTable = lines.aggregateByKey(
new DBAggregateInit(),
new DBAggregate(),
TimeWindows.of("write_aggregate", 10000));
dbwriteTable.toStream().print();
KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);
kafkaStreams.start();
DBAggregateInit 和 DBAggregate 在遇到任何问题时被删除以记录到 DEBUG。没有其他功能。
这些存根函数都不会被命中。
不确定我在这里错过了哪些步骤。如果我.foreach() 或对该主题进行简单阅读,它似乎可以正常工作。
FWIW:
当我让 kafka 创建主题而不是使用 kafka-topic --create --topic ... 时,我遇到了类似的分区问题。
【问题讨论】:
标签: java apache-kafka apache-kafka-streams