【问题标题】:Apache Kafka - REST API based consumer?Apache Kafka - 基于 REST API 的消费者?
【发布时间】:2020-11-11 12:55:38
【问题描述】:

我跟随 this article 构建了一个简单的 Java Spring Boot 应用程序来与 Apache Kafka 一起工作。

如下所述定义生产者控制器

@RestController
@RequestMapping(value = "/kafka")
public class KafkaController {

    private final Producer producer;

    @Autowired
    KafkaController(Producer producer) {
        this.producer = producer;
    }

    @PostMapping(value = "/publish")
    public void sendMessageToKafkaTopic(@RequestParam("message") String message) {
        this.producer.sendMessage(message);
    }
}

和生产者服务如下

@Service
public class Producer {

    private static final Logger logger = LoggerFactory.getLogger(Producer.class);
    private static final String TOPIC = "users";

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String message) {
        logger.info(String.format("#### -> Producing message -> %s", message));
        this.kafkaTemplate.send(TOPIC, message);
    }
}

如下所述的消费者服务

@Service
public class Consumer {

    private final Logger logger = LoggerFactory.getLogger(Producer.class);

    @KafkaListener(topics = "users", groupId = "group_id")
    public void consume(String message) throws IOException {
        logger.info(String.format("#### -> Consumed message -> %s", message));
    }
}

配置如下图

server:
  port: 9000
spring:
  kafka:
    consumer:
      bootstrap-servers: localhost:9092
      group-id: group_id
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      bootstrap-servers: localhost:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

它按预期工作

并在控制台上打印消息

我想要一个基于 REST API 的消费者,而不是仅仅在控制台上打印消息。我该怎么做?

【问题讨论】:

  • 不清楚您所说的“基于 REST API 的消费者”是什么意思。如果您的意思是要使用 REST 请求中的一些记录,请参阅this answer
  • 是的,我需要一个可以从主题中检索最新记录/消息的端点。我确实通过了提供的链接。您能否让我知道如何转换代码,以便我有一个 Rest 端点来检索消息/记录。我是java新手。

标签: java spring-boot apache-kafka spring-kafka


【解决方案1】:

How to read a message from Kafka topic on demand

只需将代码(以try 开头)放入@GetMapping 方法中即可。

【讨论】:

    猜你喜欢
    • 2020-10-08
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-08-18
    • 1970-01-01
    • 1970-01-01
    • 2020-11-14
    相关资源
    最近更新 更多