【问题标题】:KStream windowed aggregation - partition problemsKStream 窗口聚合 - 分区问题
【发布时间】:2016-08-23 12:30:08
【问题描述】:

我在 AWS 上有一个简单的集群设置,有一个 kafka 实例和一个 zookeeper。我正在为此写<String, String> 并努力在 10 秒的窗口中聚合这些值。

我收到的错误消息:

DEBUG  o.a.kafka.clients.NetworkClient - Sending metadata request {topics=[kafka_test1-write_aggregate-changelog]} to node 100
DEBUG  org.apache.kafka.clients.Metadata - Updated cluster metadata version 6 to Cluster(nodes = [12.34.56.78:9092 (id: 100 rack: null)], partitions = [Partition(topic = kafka_test1-write_aggregate-changelog, partition = 1, leader = 100, replicas = [100,], isr = [100,], Partition(topic = kafka_test1-write_aggregate-changelog, partition = 0, leader = 100, replicas = [100,], isr = [100,]])
DEBUG  o.a.k.c.consumer.internals.Fetcher - Attempt to fetch offsets for partition kafka_test1-write_aggregate-changelog-0 failed due to obsolete leadership information, retrying.

cluster metadata# 无限前进。

代码:

KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> lines = kStreamBuilder.stream(TOPIC);

KTable<Windowed<String>, String>  dbwriteTable = lines.aggregateByKey(
            new DBAggregateInit(),
            new DBAggregate(),
            TimeWindows.of("write_aggregate", 10000));

dbwriteTable.toStream().print();

KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);

kafkaStreams.start();

DBAggregateInitDBAggregate 在遇到任何问题时被删除以记录到 DEBUG。没有其他功能。

这些存根函数都不会被命中。

不确定我在这里错过了哪些步骤。如果我.foreach() 或对该主题进行简单阅读,它似乎可以正常工作。

FWIW:

当我让 kafka 创建主题而不是使用 kafka-topic --create --topic ... 时,我遇到了类似的分区问题。

【问题讨论】:

    标签: java apache-kafka apache-kafka-streams


    【解决方案1】:

    我相信这种错误是由于我以不同的用户身份运行 zookeeper 和 kafka,并且各种数据文件夹中可能存在权限问题。

    一旦这两个服务都以 root 身份运行并且所有相关数据文件都被删除/重新创建,这些错误就消失了。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2021-09-05
      • 1970-01-01
      • 1970-01-01
      • 2019-10-15
      • 1970-01-01
      • 2018-09-18
      • 1970-01-01
      相关资源
      最近更新 更多