【问题标题】:Kafka automation Java when service is producing the messages服务生成消息时的 Kafka 自动化 Java
【发布时间】:2021-07-06 21:34:28
【问题描述】:

我想自动化 API 服务生成的 Kafka 消息。有人可以提供您对此的见解吗?

  1. 触发 REST API 请求
  2. kafka 消息作为生产者发布到日志中
  3. 需要从日志中获取这些消息并验证它们

【问题讨论】:

  • 哪些消息?哪个api服务?您需要提供有关您正在尝试做什么的更多详细信息?
  • 您提出的问题就像我们在您的团队中工作一样。我们现在对您的需求一无所知,请提供更多详细信息,如@KevinHooke 所说..
  • 对不起,伙计们,更新了我的问题,这是非常基本的,因为这是第一次尝试自动化 kafka 消息,已经研究了以下一些内容:1.knowledge.zerocode.io/knowledge/kafka-testing-introduction 2.baeldung.com/spring-boot-kafka-testing 但我想要上面描述中提到的东西。
  • 仍然不清楚您所说的“自动化”是什么意思。听起来您想模拟/调用包含 Kafka 生产者的 REST API……如果是这样,请使用 Wiremock 作为包含 Kafka 生产者的起点。否则,请编辑您的问题以包含您遇到的代码和确切问题

标签: java testing apache-kafka automation message


【解决方案1】:

终于得到了我自己和团队的答案,并在这里发布给任何后代, 从下面的第 2 行开始,一旦返回消息,需要编写自己的断言来验证 kafka 消息是否有效以及 JSON 格式是否正确。

KafkaPicker kafkaPicker = new KafkaPicker("Charging", properties);
ConsumerRecords<Object, Object> messages = kafkaPicker.returnAll();
System.out.println("Charging topic messages are : " + messages);
kafkaPicker.close();


public class KafkaPicker implements AutoCloseable {
private static final Duration POLL_TIMEOUT = Duration.ofSeconds(1);
private final String topicName;
private final KafkaConsumer<Object, Object> consumer;

public KafkaPicker(String topicName, Properties properties) {
    this.topicName = topicName;
    consumer = new KafkaConsumer<>(properties);
}

public void readSpecific(Long offset, Integer partition) {
    TopicPartition topicPartition = new TopicPartition(topicName, partition);
    consumer.assign(Collections.singletonList(topicPartition));
    consumer.seek(topicPartition, offset);
    ConsumerRecords<Object, Object> records = consumer.poll(POLL_TIMEOUT);
    records.forEach(a -> System.out.println(a.value()));
}

public void readAll() {
    TopicPartition topicPartition = new TopicPartition(topicName, 0);
    List<TopicPartition> partitions = Collections.singletonList(topicPartition);
    consumer.assign(partitions);
    consumer.seekToBeginning(partitions);
    ConsumerRecords<Object, Object> records = consumer.poll(POLL_TIMEOUT);
    records.forEach(a -> System.out.println(a.value()));
}

public ConsumerRecords<Object, Object> returnAll() {
    TopicPartition topicPartition = new TopicPartition(topicName, 0);
    List<TopicPartition> partitions = Collections.singletonList(topicPartition);
    consumer.assign(partitions);
    consumer.seekToBeginning(partitions);
    boolean size = true;
    ConsumerRecords<Object, Object> records = null;
    while (size) {
        records = consumer.poll(POLL_TIMEOUT);
        if (records.count() != 0)
            size = false;
    }
    return records;
}

public void close() {
    consumer.close();
}

【讨论】:

    猜你喜欢
    • 2017-11-06
    • 2016-12-07
    • 2021-02-16
    • 2021-01-20
    • 1970-01-01
    • 2017-08-27
    • 1970-01-01
    • 2021-10-11
    相关资源
    最近更新 更多