【问题标题】:How to unit test Kafka Streams如何对 Kafka Streams 进行单元测试
【发布时间】:2017-08-30 09:56:09
【问题描述】:

在探索如何对 Kafka Stream 进行单元测试时,我遇到了 ProcessorTopologyTestDriver,不幸的是,这个类似乎已经被版本 0.10.1.0 (KAFKA-4408) 破坏了

是否有针对 KTable 问题的解决方法?

我看到了“Mocked Streams”项目,但首先它使用版本 0.10.2.0,而我使用的是 0.10.1.1,其次是 Scala,而我的测试是 Java/Groovy。

关于如何在无需引导 zookeeper/kafka 的情况下对流进行单元测试的任何帮助都会很棒。

注意:我确实有使用嵌入式服务器的集成测试,这是用于单元测试,也就是快速、简单的测试。

编辑

感谢拉蒙·加西亚

对于通过 Google 搜索到达这里的人,请注意测试驱动程序类现在是 org.apache.kafka.streams.TopologyTestDriver

这个类在maven包groupId org.apache.kafka, artifactId kafka-streams-test-utils

【问题讨论】:

  • ProcessorTopologyTestDriver 是内部测试类,不是正式发布的一部分。您提到的 JIRA 已经修复——因此您可以从 Github 获取最新版本。
  • 如果使用了依赖项上的 test 限定符,则该类可用。我不能使用最新版本,因为我必须支持0.10.1.1
  • 以防万一您不知道@Hartimer:Kafka Streams 版本0.10.2.0 可以针对0.10.1.1 Kafka 代理运行,它是向后兼容的。因此,如果代理端版本要求是阻碍您支持 Kafka Streams 的原因——它不是。 ;-)
  • 很高兴知道! :) 谢谢。但是,如果我没看错的话,链接问题的修复与尚未发布的版本 0.11.0 相关联,对吧?

标签: apache-kafka apache-kafka-streams


【解决方案1】:

我找到了解决这个问题的方法,我不确定这是不是答案,尤其是在https://stackoverflow.com/users/4953079/matthias-j-sax 评论之后。无论如何,分享一下我到目前为止所拥有的......

我完全复制了ProcessorTopologyTestDriverfrom the 0.10.1 branch(这是我正在使用的版本)。

为了解决KAFKA-4408,我使private final MockConsumer<byte[], byte[]> restoreStateConsumer 可访问并将块task = new StreamTask(... 移动到一个单独的方法,例如bootstrap.

在测试的设置阶段,我执行以下操作

driver = new ProcessorTopologyTestDriver(config, builder)
ArrayList partitionInfos = new ArrayList();
partitionInfos.add(new PartitionInfo('my_ktable', 1, (Node) null, (Node[]) null, (Node[]) null));
driver.restoreStateConsumer.updatePartitions('my_ktable', partitionInfos);
driver.restoreStateConsumer.updateEndOffsets(Collections.singletonMap(new TopicPartition('my_ktable', 1), Long.valueOf(0L)));
driver.bootstrap()

就是这样……

奖金

我还遇到了KAFKA-4461,幸运的是,因为我抄袭了整个课程,所以我能够通过细微的调整“挑选”accepted fix

一如既往地感谢您的反馈。虽然显然不是官方测试课程,但这个驱动程序被证明非常有用!

【讨论】:

  • 我想这是正确的解决方案——这就是我指出修复的原因... 有计划发布官方测试驱动程序——但还没有时间表——这应该比它更容易。
  • org.apache.kafka.test.ProcessorTopologyTestDriver 至少在 0.11 github.com/apache/kafka/blob/0.11.0.0/streams/src/test/java/org/…987654326@ 正式发布
  • 非常感谢 Dimitry 的跟进!
【解决方案2】:

对于通过 Google 搜索到达这里的人,请注意测试驱动程序类现在是 org.apache.kafka.streams.TopologyTestDriver

这个类在maven包groupId org.apache.kafka, artifactId kafka-streams-test-utils

【讨论】:

  • 添加进行此更改的版本会更有意义。 1.1.0
猜你喜欢
  • 2020-05-19
  • 1970-01-01
  • 2018-04-02
  • 2019-03-15
  • 1970-01-01
  • 1970-01-01
  • 2019-04-15
  • 2021-05-04
  • 2012-01-08
相关资源
最近更新 更多