【问题标题】:Can I test kafka-streams suppress logic?我可以测试 kafka-streams 抑制逻辑吗?
【发布时间】:2019-08-28 06:55:12
【问题描述】:

我的应用程序使用 kafka 流 suppress 逻辑。

我想使用抑制测试 kafka 流拓扑。

正在运行 uinit 测试,我的拓扑没有发出结果。

Kafka 流逻辑

...
.suppress(Suppressed.untilTimeLimit(Duration.ofSeconds(5), Suppressed.BufferConfig.maxBytes(1_000_000_000L).emitEarlyWhenFull()))
...

我的测试用例代码。 创建输入数据后,运行测试用例无法读取抑制逻辑输出记录。 只需返回null

testDriver.pipeInput(recordFactory.create("input", key, dummy, 0L));

System.out.println(testDriver.readOutput("streams-result", Serdes.String().deserializer(), serde.deserializer()));

我可以测试我的抑制逻辑吗?

【问题讨论】:

标签: apache-kafka apache-kafka-streams


【解决方案1】:

简单的答案是肯定的。

一些很好的参考是Confluent Example Tests 这个例子特别测试了抑制功能。许多其他示例始终是首先检查的好地方。这是我用Kotlin 写的另一个例子。

可以在blog post的帖子 3 中找到该功能的说明和测试

一些关键点:

  • 该窗口只会发出文档中预期的最终结果。
  • 要刷新最终结果,您需要发送一个额外的虚拟事件,如 confluents here 等示例中所示。
  • 您需要操纵事件时间来测试它,因为抑制在事件时间上起作用,这可以由测试输入主题 API 提供或使用自定义 TimestampExtractor
  • 对于测试,我建议设置以下内容以删除缓存并减少提交间隔。

    props[StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG] = 0 道具[StreamsConfig.COMMIT_INTERVAL_MS_CONFIG] = 5

希望这会有所帮助。

【讨论】:

  • 感谢您的回答和示例,确实帮助我完成了我正在从事 atm 的项目
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-10-23
  • 2017-06-09
  • 1970-01-01
  • 2021-05-04
  • 2018-04-02
  • 2019-05-29
相关资源
最近更新 更多