【发布时间】:2018-05-02 13:18:32
【问题描述】:
我有一个 Kafka 主题 t1,其中包含 100 条消息。在 flatMapValues() 操作之后,结果被写入第二个主题 t2。 当我运行以下命令时:
int count = 0;
t2.groupByKey().count().toStream().foreach((key, value) -> System.out.println(++count));
count 的最终值为 100,这是预期的结果。
但是,当我对一个有 500 条消息的主题应用相同的逻辑时(flatMapValues() + 写入另一个主题 + 按键分组),count 的最终值略大于 500。 如果主题有 10K 条消息,差距会变得更大。
可能是什么问题?
【问题讨论】:
-
为什么要数两次?
foreach是否应该不打印System.out.println(value)? -
我没有计算两次@MatthiasJ.Sax,我通过增加变量“count”的值来计算结果流中的记录数
-
好吧,“两次”这个词不合适。为什么要在两个地方并行计算? -- 另外,如果您执行 flatMapValues(),您发出的记录是否会多于输入记录?如果没有,为什么不使用 mapValues() ?
-
@MatthiasJ.Sax 是的 flatMapValues() 肯定会发出更多记录,这就是为什么 groupByKey() 应该在 flatMapValues() 之前返回完全相同数量的记录。 count() 只是对 KGroupedStream 上的聚合函数的虚拟调用,以获取 KTable 和 KStream。
-
从你原来的问题看来,你打电话给
stream.flatMapValues().groupByKey().count()...也许你可以分享整个代码?我假设没有失败?请注意,Kafka 在默认情况下保证至少处理一次。
标签: apache-kafka apache-kafka-streams