【问题标题】:How to test Kafka Consumer with Embedded-Kafka-lib, exactly with 'withRunningKafka'?如何使用 Embedded-Kafka-lib 测试 Kafka Consumer,完全使用“withRunningKafka”?
【发布时间】:2023-03-23 10:50:01
【问题描述】:

我应该测试我的代码,以便通过嵌入式“withRunningKafka”使用来自 kafka-server 的所有消息,如下所示:https://github.com/manub/scalatest-embedded-kafka

  1. 我尝试通过创建的嵌入式生产者向主题发送消息。
  2. 我尝试通过项目中的代码使用生成的消息(由嵌入式生产者创建)。

“使用自定义生产者和消费者进行测试”应该{

"work" in {

    withRunningKafka {

      1. val producer: KafkaProducer[String, String] =
               aKafkaProducer[String](valueSerializer, config)

         val topic = "topic-to-test"

         producer.send(new ProducerRecord[String, String](topic, "some message 1"))
         producer.send(new ProducerRecord[String, String](topic, "some message 2"))
         producer.close()

      2. val ok: Future[Done] = Consumer
        .committableSource(
            consumerSettings,
            Subscriptions.topics(topic))
        .map(msg => println(msg.record.value()))
        .runWith(Sink.ignore)

       ok should be (Done)
    }
}}

问题在这里:'ok' 没有给出'Done' 的结果。 一般来说,我测试消费者的逻辑是否正确?

【问题讨论】:

    标签: scala apache-kafka scalatest producer-consumer alpakka


    【解决方案1】:

    欢迎使用 stackoverflow!

    原因ok 从未完成并有结果,因为源正在等待可能的进一步消息。在map前添加.take(2),源将在两个元素后停止,让okfuture完成。

    【讨论】:

    • dvim 不幸的是,它不起作用。我的测试方法对吗?
    • 我注意到的另一件事是ok 的类型是Future[Done],您将它与Done 进行比较。您应该从未来提取值进行比较。您可以通过将 ScalaFutures 从 scalatest 混合到您的测试类来做到这一点,然后执行 ok.futureValue shouldBe Done
    【解决方案2】:

    我认为您同时面临两个问题:

    1. Kafka 消费者无限等待元素(如@dvim 所说),因此您需要 .take() 才能真正结束

    2. 默认情况下,kafka 消费者组将从当前主题的末尾开始,而不是开始,因此不会消耗在旋转之前发布的消息。您需要设置一个设置,使其从主题的开头而不是结尾开始。

    【讨论】:

    • 我的消费者在 localhost:9092 工作,而我的生产者在内存工作,不是吗?
    • 对不起,我不明白你的评论。在kafka.apache.org/documentation中搜索“auto.offset.reset”
    猜你喜欢
    • 2020-10-17
    • 2020-04-09
    • 2017-02-05
    • 2015-08-31
    • 2018-11-27
    • 1970-01-01
    • 2023-03-02
    • 1970-01-01
    • 2021-07-15
    相关资源
    最近更新 更多