【问题标题】:How to wait for Kafka respone in a Jest test with KafkaJS?如何在 KafkaJS 的 Jest 测试中等待 Kafka 响应?
【发布时间】:2021-01-21 11:25:45
【问题描述】:

考虑这个测试,其中一条消息从测试发送到主题“out”,并且测试的代码预计会使用它并通过向主题“in”发送消息来回复。为了通过,我想确保将消息发送到主题“in”。

it('...', async () => {
  /* initialize kafkaConsumer and kafkaProducer here */

  async function someCallback() {
    // ...
  }

  await kafkaConsumer.subscribe({ topic: 'in', fromBeginning: true })
  await kafkaConsumer.run({ eachMessage: someCallback })

  await kafkaProducer.send({ topic: 'out', messages: [{ key: '1', value: '2' }] })

  // How do I block here until someCallback is called?
})

我读到了关于使用done 的信息,但是我不能拥有它,因为测试本身是定义async 的,我需要它才能使用await。有没有我不知道的不同方式?

【问题讨论】:

  • 嗨,您介意分享更多您为此使用的代码,例如设置生产者、主题、kafka 服务器等。
  • @KalyanChavali 看看我刚刚在下面发布的答案

标签: node.js jestjs kafkajs


【解决方案1】:

您可以查看我们如何测试 KafkaJS 本身以获得一些灵感。例如,here's a basic consumer test

我们真的没有做任何花哨的事情,只是从eachMessage 回调中将消息添加到数组中,然后等待一个定期检查我们是否达到预期消息数量的承诺。像这样的:

it('consumes messages', async () => {
  const messages = [{ value: 'hello world' }]
  const consumedMessages = []

  consumer.run({
    eachMessage: ({ message }) => {
      consumedMessages.push(message);
    }
  })

  await producer.send({ topic, messages })

  await waitFor(() => consumedMessages.length === messages.length)
})

waitFor 本质上是一个函数,它返回一个 Promise 并启动一个 setTimeout 来检查谓词并在谓词为真时解析该 Promise(或者如果遇到超时则拒绝)。

需要记住的一些问题:

  • 在每次运行时使用新的groupId,这样多次运行就不会相互干扰。
  • 出于同样的原因,在每次测试运行时使用一个新主题。
  • 如果您在消费者加入群组并订阅主题之前生成消息,则默认情况下不会显示这些消息。使用fromBeginning: true 订阅或等待您的消费者在生产之前订阅并加入组(检测事件在组加入时发出一个事件,您可以像我们等待消息被消费一样等待)。

【讨论】:

  • 我最终做了类似的事情。谢谢:)
  • 如何导入这个waitFor?在任何地方都找不到任何例子
  • 我还需要 testhelpers 来在我的项目中创建 kafka 测试吗?
  • @KalyanChavali 你可以在timeout 承诺和消费承诺之间使用Promise.race 自己实现waitForpastebin.com/Aiu66rdH
  • @KalyanChavali 如果您在测试库中使用 jest,您也可以使用 wait-for-expect 模块
【解决方案2】:

在使用 Tommy Brunn 的回答一段时间后,我发现了一些错误,我最终得到了这个:

export const waitForKafkaMessages = async (
  kafka: Kafka,
  messagesAmount: number,
  topic: string,
  fromBeginning: boolean,
  groupId: string,
): Promise<KafkaMessage[]> => {
  const consumer: Consumer = kafka.consumer({ groupId })
  await consumer.connect()
  await consumer.subscribe({ topic, fromBeginning })

  let resolveOnConsumption: (messages: KafkaMessage[]) => void
  let rejectOnError: (e: Error) => void

  const returnThisPromise = new Promise<KafkaMessage[]>((resolve, reject) => {
    resolveOnConsumption = resolve
    rejectOnError = reject
  }).finally(() => consumer.disconnect()) // disconnection is done here, reason why is explained below

  const messages: KafkaMessage[] = []
  await consumer.run({
    autoCommit: false,
    eachMessage: async ({ message, partition, topic }) => {
      try {
        // eachMessage is called by eachBatch which can consume more than messagesAmount.
        // This is why we manually commit only messagesAmount messages.
        if (messages.length < messagesAmount) {
          messages.push(message)

          // +1 because we need to commit the next assigned offset.
          await consumer.commitOffsets([{ topic, partition, offset: (Number(message.offset) + 1).toString() }])
        }

        if (messages.length === messagesAmount) {
          // I think we should be able to close the connection here, but kafkajs has a bug which makes it hang if consumer.disconnect is called too soon after consumer.run .
          // This is why we close it in the promise's finally block

          resolveOnConsumption(messages)
        }
      } catch (e) {
        rejectOnError(e)
      }
    },
  })

  return returnThisPromise
}

【讨论】:

  • 嗯,但是我们如何总是产生不同的主题呢?我从我的函数中产生,主题来自 process.env。消费者使用相同的主题...您是决定将这个新主题传递给您的所有函数还是使用源连接器以便您不用担心?
  • 我刚刚使用我的数据模型的 ID 创建了新主题,并确保它在我的所有测试中都是唯一的。在我的主要功能中,我有 process.env.TOPIC || post.id。请注意,由于主题命名约定,您可能会遇到错误。 @chikchakchok 也是非常好的方法。我用了和你一样的方法。非常感谢!!!!
猜你喜欢
  • 2017-09-14
  • 1970-01-01
  • 1970-01-01
  • 2017-12-25
  • 1970-01-01
  • 1970-01-01
  • 2015-11-17
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多