【发布时间】:2024-05-26 01:25:01
【问题描述】:
如何使用新的 Spring Cloud Stream Kafka 功能模型发送消息?
不推荐使用的方式如下所示。
public interface OutputTopic {
@Output("output")
MessageChannel output();
}
@Autowired
OutputTopic outputTopic;
public someMethod() {
outputTopic.output().send(MessageBuilder.build());
}
但是我怎样才能以函数式风格发送消息呢?
application.yml
spring:
cloud:
function:
definition: process
stream:
bindings:
process-out-0:
destination: output
binder: kafka
@Configuration
public class Configuration {
@Bean
Supplier<Message<String>> process() {
return () -> {
return MessageBuilder.withPayload("foo")
.setHeader(KafkaHeaders.MESSAGE_KEY, "bar".getBytes()).build();
};
}
我会自动装配一个 MessageChannel,但没有 MessageChannel-Bean 用于进程、process-out-0、输出或类似的东西。或者我可以用供应商 Bean 发送消息吗? 有人可以给我一个例子吗? 非常感谢!
【问题讨论】:
标签: spring-boot functional-programming spring-kafka spring-cloud-stream-binder-kafka