【发布时间】:2021-04-11 00:48:35
【问题描述】:
我看到 Spring Cloud Stream 的以下注释已被贬值
@Input
@Output
@EnableBinding
@StreamListener
请提供示例和文档链接,说明如何以实用的方式进行操作。
【问题讨论】:
标签: spring spring-cloud-stream
我看到 Spring Cloud Stream 的以下注释已被贬值
@Input
@Output
@EnableBinding
@StreamListener
请提供示例和文档链接,说明如何以实用的方式进行操作。
【问题讨论】:
标签: spring spring-cloud-stream
spring 现在不再使用基于注释的配置,而是使用检测到的 Consumer/Function/Supplier bean 为您定义流。 旧版本带有注释的代码如下所示:
interface InputChannels {
@Input("input")
SubscribableChannel input();
}
@EnableBinding(InputChannels.class)
public class PubSubDemo {
@StreamListener("input")
public void listen() {
if (LOG.isInfoEnabled()) {
LOG.info(context.toString());
}
}
新版本代码如下:
public class PubSubDemo {
@Bean
Consumer<String> input() {
return str -> {
if (LOG.isInfoEnabled()) {
LOG.info(context.toString());
}
};
}
}
检查 Consumer bean 替换了 @StreamListener 和 @Input。
关于配置,如果之前为了配置,您的 application.yml 看起来像这样:
spring:
cloud:
stream:
bindings:
input:
destination: destination
group: group
consumer:
concurrency: 10
max-attempts: 3
现在新的配置会是这样的
spring:
cloud:
stream:
bindings:
input-in-0:
destination: destination
group: group
consumer:
concurrency: 10
max-attempts: 3
in 和out 对应于绑定的类型(例如输入或输出)。 index 是输入或输出绑定的索引。对于典型的单输入/输出函数,它始终为 0。
现在让我们考虑输出通道:
public interface OutputChannels {
@Output
MessageChannel output();
}
@Service
@EnableBinding(OutputChannels.class)
class PubSubSendQueue {
OutputChannels outputChannel;
public void publish() {
outputChannel.output().send("Hello");
}
}
现在函数代码如下:
@Service
class PubSubSendQueue {
@Bean
public Supplier<String> output(){
return Supplier { "Adam" }
}
}
【讨论】:
这个 github 存储库包含很多示例..
https://github.com/spring-cloud/stream-applications
官方文档详细解释了如何在使用 kafka 流的 Spring Cloud Stream 应用程序中从命令式转换为函数式,但没有它也是一样的。
也请查看此帖子..
https://spring.io/blog/2019/10/14/spring-cloud-stream-demystified-and-simplified
有一个命令式代码示例 (https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.1.0/reference/html/spring-cloud-stream-binder-kafka.html#_imperative_programming_model) 以及如何使用函数式风格开发它。
【讨论】:
这里有一些更有用的信息:
发送消息
使用 org.springframework.cloud.stream.function.StreamBridge 发送消息。
之前
myDataSource.output().send(message);
之后
streamBridge.send("myData-out-0", message);
更换 ServiceActivator
之前
@ServiceActivator(inputChannel = MyProcessor.INPUT, outputChannel = MyProcessor.OUTPUT)
public Message<MySuperOutputMessage> transform(Message<MySuperInputMessage> message) { ... }
之后
@Bean
Function<Message<MySuperInputMessage>, Message<MySuperOutputMessage>> myCoolFunction() {
return message -> {...};
}
不要忘记在属性 spring.cloud.function.definition 中注册“myCoolFunction”。
【讨论】: