【问题标题】:Functional programming model bean definition and spring-cloud-function + spring-cloud-stream integration函数式编程模型bean定义与spring-cloud-function + spring-cloud-stream集成
【发布时间】:2019-10-24 07:45:02
【问题描述】:

大家好,特别是春季团队!

如何以函数式 Bean 编程模型风格将 spring-cloud-function 与 spring-cloud-stream 管道化?

例如,我有两个依赖项的 pom.xml:

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-reactive</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-function-webflux</artifactId>
</dependency>

假设我接下来想做:

  1. 通过 spring-cloud-function (webflux) 通过 http 有效负载字符串发送
  2. 使用我的 toUpperCase 函数将其大写
  3. 最后发送到我的管道以安装 binder (kafka/rabbit/test-binder)

所以我希望像这样实现它:

@Log4j2
@SpringBootApplication
public class SpringCloudFunctionStreamApplication {

  /**
   * can I sent result of that function to my broker without any
   * explicitly defined output.send(...) execution?
   */
  @Bean
  public Function<String, String> toUpperCase() {
    return arg -> {
      var res = arg.toUpperCase();
      log.info("toUpperCase: {}", res);
      return res;
    };
  }

  public static void main(String[] args) {
    SpringApplication.run(
      SpringCloudFunctionStreamApplication.class,
      "--spring.cloud.function.definition=toUpperCase",
      "--spring.cloud.stream.function.definition=toUpperCase"
    );
  }
}

所以当我使用 HTTPie 发送有效负载时,如下所示:

echo 'hello' | http :8080/toUpperCase

spring-cloud-function 似乎工作正常,我可以看到预期的日志:

2019-06-09 21:20:36.978 ...SpringCloudFunctionStreamApplication : toUpperCase: hello

如果我通过 rabbitmq 管理 web ui 发布消息也是一样的想法,但是我如何从一个管道传输到另一个

所以我的问题与 according to spring documentation which says that I can use spring-cloud-stream as well:Wrappers for @Beans 类型的 Function、Consumer 和 Supplier 相关,将它们作为 HTTP 端点和/或带有 RabbitMQ、Kafka 等的消息流侦听器/发布者向外界公开,但我看不懂怎么办?

目前,不幸的是,我只能使用 Source see example here 手动将消息发布到 spring-cloud-stream binder,但我当然想知道 spring 是否可以神奇地避免...

请任何人告诉我(可能是 Gary Russell、Dave Sawyer、Artem Bilan、Oleg Zhurakousky 或其他任何知道的人):我错过了什么以及我应该如何配置我的应用程序或者我应该在我的 application.properties 中添加哪些道具等?

谢谢!

更新

已经有一段时间了,但我决定在这里发布一个解决方案......

简而言之:固定提交在这里:https://github.com/daggerok/spring-cloud-function-stream-integration/commit/35325465b81bb869c31ec7892f413ab891d6d0fd

所以基本上使用 StreamBridge 我可以将 spring-cloud-function 与 spring-cloud-stream 连接起来......我可以在适当的目的地发送任何想要的东西,可以直接从我的 spring-cloud 主体配置所需的管道链功能

在提到的更新存储库中查看详细信息(如果您愿意,可以修复分支)

干杯!


问候, 马克西姆

【问题讨论】:

  • 马克西姆,如果我理解正确,你想要http -&gt; function(s) -&gt; rabbit 对吗?
  • 你好奥列格!是的,我只想为两者提供函数定义:spring-cloud-function 和 spring-cloud-stream 并在某些地方配置数据流管道......所以无论 spring-cloud-function 将由某人通过休息触发,它的输出应该是管道根据使用rabbit/karfka的配置,在spring-cloud-stream旁边转发。目前我只能手动将数据从函数转发到队列中,但如果我不应该注入 Source 并使用它手动将数据传递到队列中,那就太棒了

标签: spring-webflux spring-cloud-stream spring-cloud-function


【解决方案1】:

马克西姆

这并不理想,但鉴于我们在现有绑定器的范围内实现了初始功能支持,因此存在一些限制。我会解释,但首先是功能齐全的代码:

@SpringBootApplication
public class SimpleFunctionRabbitDemoApplication  {

    public static void main(String[] args) throws Exception {
        SpringApplication.run(SimpleFunctionRabbitDemoApplication.class,
            "--spring.cloud.stream.function.definition=uppercase");
    }

    @Autowired
    private Processor processor;

    @Bean
    public Consumer<String> consume() {
        return v -> processor.input().send(MessageBuilder.withPayload(v).build());
    }

    @Bean
    public Function<String, String> uppercase() {
        return value -> value.toUpperCase();
    }
}

基本上有一点不匹配。在流方面,我们有活页夹,在功能方面,我们有适配器。您正在有效地(根据您的要求)尝试将两者连接到管道中。所以。 . .

让我们先看看活页夹。

大写函数绑定到消息通道绑定器(rabbit或kafka)提供的inputoutput通道,有效地创建了内部管道input -&gt; uppercase -&gt; output。它也被 s-c-function 暴露为 REST 端点,但是,s-c-function 无权访问上述管道。事实上,它实际上有自己的管道request -&gt; uppercase -&gt; reply。 所以我们需要做的是将这两个概念连接在一起,这实际上就是我所做的。

  • 您使用Processor 绑定注入您的应用程序,其中包含对uppercase 绑定到的通道的引用。

  • 您通过 REST http://localhost:8080/consume/blah 调用 consume()

  • 你向uppercase函数的输入通道发送消息

为了简化这一点,我们只需要创建一个类似 binder 的 Web 适配器版本,因此请随时提出功能请求。但正如您所看到的,当前的解决方法并不是全部。

【讨论】:

  • 您好 Oleg Zhurakousky,感谢您的反馈!我有两个问题:1)在这种情况下我们真的需要处理器吗?如果我们只需要将数据放入管道中,我认为 Source 就足够了。 2) 你错过了@EnableBinding(Processor.class) 还是不再需要它?例如你提供的,我收到一个错误:Dispatcher has no subscribers for channel 'application.integrationFlowCreator.channel#0'.;嵌套异常是 org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
  • 是的,来源就足够了。关于 EnableBinding,是的,在上述情况下不需要,但您需要最新版本的流或添加 EnableBInding 回
  • 我终于把它修好了,就像我从一开始就想要的那样)使用 StreamBridge 的新的 spring 云流功能方法很好。以防万一有人感兴趣:github.com/daggerok/spring-cloud-function-stream-integration/…
【解决方案2】:

这是 Oleg Zhurakousky 的一个问题。如果能回答会很高兴

如果我使用@Bean Supplier&lt;Pojo&gt;... 绑定输出目标,每次将新的Pojo 发送到Kafka/Rabbit 时,如何从@Service 类或@Controller 类调用它。

Supplier 只公开了一个get() 方法。

我只写生产者,它将向 Kafka 写入自定义 Pojo,而另一个应用程序是消费者。对于 Consumer&lt;Pojo&gt;...,函数式方法更清晰,它只会从 Kafka 中读取并处理。生产者的Supplier&lt;Pojo&gt;... 部分不清楚。

https://www.youtube.com/watch?v=nui3hXzcbK0&t=3478s

【讨论】:

【解决方案3】:

@阿布舍克

您可以按照here 的说明使用EmitterProcessor。提供的示例使用 rest 端点作为他的实际数据源,但正如您所见,它不会计量,因为您需要做的就是调用 EmitterProcessoronNext 操作,从 Service 传递您的事件。

【讨论】:

    猜你喜欢
    • 2019-03-08
    • 2020-12-22
    • 1970-01-01
    • 2020-02-10
    • 1970-01-01
    • 2020-10-24
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多