【问题标题】:Spring Cloud Stream Function : Invoke Function<T,R> via REST call and output it to a KAFKA TopicSpring Cloud Stream 函数:通过 REST 调用调用 Function<T,R> 并将其输出到 KAFKA 主题
【发布时间】:2020-06-21 09:25:28
【问题描述】:

我有简单的@Bean(Java 8 函数)映射到目标topic-out-in)。

@Bean
public Function<String, String> transform() {
    return payload -> payload.toUpperCase();
}

@Bean
public Consumer<String> receive() {
    return payload -> logger.info("Data received: " + payload);
}

.yml 配置:

spring:
  cloud:
    stream:
      function:
        definition: transform;receive
      bindings:
        transform-out-0:
          destination: myTopic
        receive-in-0:
          destination: myTopic

现在,我想通过REST 调用调用transform 函数,以便它的输出转到destination topic(即transform-out-0 映射到myTopic)并被consumer 拾取从这个目的地(receive-in-0 映射到 myTopic)。基本上,每个 REST 调用都应该生成一个 KAFKA Producer 的新实例并关闭它。

请问如何使用spring-cloud-stream 实现这一点?

谢谢

昂舒曼

【问题讨论】:

    标签: java java-8 spring-cloud-stream spring-cloud-function


    【解决方案1】:

    您应该使用 StreamBridge 而不是 transform 函数。这是 Spring Cloud Stream 中动态目的地的新推荐方法。 基本思路如下:

    @Autowired
    private StreamBridge streamBridge;
    
    @RequestMapping
    public void delegateToSupplier(@RequestBody String body) {
        streamBridge.send("transform-out-0", body);
    }
    

    然后通过配置提供这个属性-spring.cloud.stream.source: transform

    Spring Cloud Stream 将为您创建一个名为 transform-out-0 的输出绑定。每次调用 REST 端点时,都会通过StreamBridge 将数据发送到目标主题。

    欲了解更多信息,请参阅this

    【讨论】:

    • 啊,谢谢。那么,假设我是 IBM MQ 上的消息侦听器,我想将消息转发到 KAFKA,我将使用 StreamBridge 吗?这也是直接调用Producer 的推荐方法吗(在本机api 中,我们执行Producer.send(...)?否则,我看到Supplier bean 总是以1 秒的间隔被无限调用。我已经替换了transformSupplier&lt;T&gt; 并注意到,不断轮询。
    • 如果你是 MQ 上的监听器,想要将消息转发到 Kafka,则不需要StreamBridge。一个简单的函数方法应该与您的原始代码一样工作。您无需直接处理Producer,由 Spring Cloud Stream Kafka binder 为您处理。没有什么可以阻止您直接使用生产者 API,但我认为您的用例不需要。
    • 关于Supplier的第二个问题,您可以使用此处列出的属性控制轮询频率:github.com/spring-cloud/spring-cloud-stream/blob/master/…
    • 谢谢。我现在得到供应商投票部分。抱歉,我对 MQ 消息转发有点迷茫。因此,对于任何Foreign event-driven sources,无论是 REST 还是 MQ 侦听器,文档说,请使用 streambridge。使用我上面的函数式方法,如果没有streambridge,它将如何转发到Kafka Topic?
    • 对不起,我误解了MQ部分。这是您想要对其采取行动的一种来源(供应商)。 StreamBridgeSupplier 在这种情况下应该可以工作。
    猜你喜欢
    • 2017-11-20
    • 2020-03-24
    • 2018-12-29
    • 1970-01-01
    • 2021-10-13
    • 2019-04-17
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多