【发布时间】:2018-07-13 22:21:11
【问题描述】:
我已经设置了一个简单的聚合平均来自多个流的值,我正在尝试对其进行测试。我已经花费了很多时间,但我似乎无法将这些概念直接放在脑海中。我的信息流如下:
// Combine multiple streams together.
KStream<String, IndividualTick> tickerStream =
priceIndexStreamBuilder.stream(exchangeTopics, Consumed.with(...));
// Group by a key & compute average per key
KStream<K, AveragedTick> avgTickerStream = tickStream.selectKey((key,
value) -> value.getK())
.groupByKey(...)
.aggregate(AvgTick::new,
(key, value, aggregate) -> {
aggregate.addTick(value);
return aggregate;
},
Materialized.with(...))
.toStream();
indexTickerStream.to(sinkTopic, Produced.with(...));
我的测试使用 EmbeddedKafka,将一堆记录发布到主题,然后坐在阻塞队列中等待记录到达sinkTopic。
我对这种聚合如何随时间变化感兴趣,因此我希望在每个输出代码中断言该平均值。我可能会添加某种程度的窗口,但我现在尽量保持简单。
当我运行测试时,我得到了不同的结果。假设我的拓扑中有 10 条输入记录:
- 我的聚合器被调用了 10 次
- 我在
AverageTick序列化程序中放置的断点被调用了不同的次数。 - 我在我的测试中断言记录的值。
我认为这是因为KIP-63 中定义的缓存功能 - 记录很快就会出现在处理节点上,并与最新记录合并/覆盖。 (虽然我并不完全确定。)
我有通过ProcessorTopologyTestDriver 的单元测试,但我正在尝试为包含此逻辑的服务编写一些验收测试。
我也尝试过使用我的 commit.interval.ms 配置,以及在发布我的输入记录之间放置睡眠,以取得不同程度的(片状)成功。
- 这些测试有意义吗?
- 如何针对真实的 Kafka 实例断言此微服务的正确性?
我觉得我在这里做一些概念上的错误 - 我只是不知道要采取什么其他方法。
【问题讨论】:
标签: apache-kafka apache-kafka-streams