【发布时间】:2023-03-23 10:50:01
【问题描述】:
我应该测试我的代码,以便通过嵌入式“withRunningKafka”使用来自 kafka-server 的所有消息,如下所示:https://github.com/manub/scalatest-embedded-kafka
- 我尝试通过创建的嵌入式生产者向主题发送消息。
- 我尝试通过项目中的代码使用生成的消息(由嵌入式生产者创建)。
“使用自定义生产者和消费者进行测试”应该{
"work" in {
withRunningKafka {
1. val producer: KafkaProducer[String, String] =
aKafkaProducer[String](valueSerializer, config)
val topic = "topic-to-test"
producer.send(new ProducerRecord[String, String](topic, "some message 1"))
producer.send(new ProducerRecord[String, String](topic, "some message 2"))
producer.close()
2. val ok: Future[Done] = Consumer
.committableSource(
consumerSettings,
Subscriptions.topics(topic))
.map(msg => println(msg.record.value()))
.runWith(Sink.ignore)
ok should be (Done)
}
}}
问题在这里:'ok' 没有给出'Done' 的结果。 一般来说,我测试消费者的逻辑是否正确?
【问题讨论】:
标签: scala apache-kafka scalatest producer-consumer alpakka