【发布时间】:2019-10-24 07:45:02
【问题描述】:
大家好,特别是春季团队!
如何以函数式 Bean 编程模型风格将 spring-cloud-function 与 spring-cloud-stream 管道化?
例如,我有两个依赖项的 pom.xml:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-reactive</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-function-webflux</artifactId>
</dependency>
假设我接下来想做:
- 通过 spring-cloud-function (webflux) 通过 http 有效负载字符串发送
- 使用我的 toUpperCase 函数将其大写
- 最后发送到我的管道以安装 binder (kafka/rabbit/test-binder)
所以我希望像这样实现它:
@Log4j2
@SpringBootApplication
public class SpringCloudFunctionStreamApplication {
/**
* can I sent result of that function to my broker without any
* explicitly defined output.send(...) execution?
*/
@Bean
public Function<String, String> toUpperCase() {
return arg -> {
var res = arg.toUpperCase();
log.info("toUpperCase: {}", res);
return res;
};
}
public static void main(String[] args) {
SpringApplication.run(
SpringCloudFunctionStreamApplication.class,
"--spring.cloud.function.definition=toUpperCase",
"--spring.cloud.stream.function.definition=toUpperCase"
);
}
}
所以当我使用 HTTPie 发送有效负载时,如下所示:
echo 'hello' | http :8080/toUpperCase
spring-cloud-function 似乎工作正常,我可以看到预期的日志:
2019-06-09 21:20:36.978 ...SpringCloudFunctionStreamApplication : toUpperCase: hello
如果我通过 rabbitmq 管理 web ui 发布消息也是一样的想法,但是我如何从一个管道传输到另一个
所以我的问题与 according to spring documentation which says that I can use spring-cloud-stream as well:Wrappers for @Beans 类型的 Function、Consumer 和 Supplier 相关,将它们作为 HTTP 端点和/或带有 RabbitMQ、Kafka 等的消息流侦听器/发布者向外界公开,但我看不懂怎么办?
目前,不幸的是,我只能使用 Source see example here 手动将消息发布到 spring-cloud-stream binder,但我当然想知道 spring 是否可以神奇地避免...
请任何人告诉我(可能是 Gary Russell、Dave Sawyer、Artem Bilan、Oleg Zhurakousky 或其他任何知道的人):我错过了什么以及我应该如何配置我的应用程序或者我应该在我的 application.properties 中添加哪些道具等?
谢谢!
更新
已经有一段时间了,但我决定在这里发布一个解决方案......
简而言之:固定提交在这里:https://github.com/daggerok/spring-cloud-function-stream-integration/commit/35325465b81bb869c31ec7892f413ab891d6d0fd
所以基本上使用 StreamBridge 我可以将 spring-cloud-function 与 spring-cloud-stream 连接起来......我可以在适当的目的地发送任何想要的东西,可以直接从我的 spring-cloud 主体配置所需的管道链功能
在提到的更新存储库中查看详细信息(如果您愿意,可以修复分支)
干杯!
问候, 马克西姆
【问题讨论】:
-
马克西姆,如果我理解正确,你想要
http -> function(s) -> rabbit对吗? -
你好奥列格!是的,我只想为两者提供函数定义:spring-cloud-function 和 spring-cloud-stream 并在某些地方配置数据流管道......所以无论 spring-cloud-function 将由某人通过休息触发,它的输出应该是管道根据使用rabbit/karfka的配置,在spring-cloud-stream旁边转发。目前我只能手动将数据从函数转发到队列中,但如果我不应该注入 Source 并使用它手动将数据传递到队列中,那就太棒了
标签: spring-webflux spring-cloud-stream spring-cloud-function