【问题标题】:@EnableBinding @deprecated as of 3.1 in favor of functional programming model@EnableBinding @deprecated 自 3.1 起支持函数式编程模型
【发布时间】:2021-04-11 00:48:35
【问题描述】:

我看到 Spring Cloud Stream 的以下注释已被贬值

@Input @Output @EnableBinding @StreamListener

请提供示例和文档链接,说明如何以实用的方式进行操作。

【问题讨论】:

    标签: spring spring-cloud-stream


    【解决方案1】:

    spring 现在不再使用基于注释的配置,而是使用检测到的 Consumer/Function/Supplier bean 为您定义流。 旧版本带有注释的代码如下所示:

    interface InputChannels {
        
            @Input("input")
            SubscribableChannel input();
        }
    
        @EnableBinding(InputChannels.class)
        public class PubSubDemo {
        @StreamListener("input")
        public void listen() {
                
            if (LOG.isInfoEnabled()) {
                LOG.info(context.toString());
            }
        }
    

    新版本代码如下:

    public class PubSubDemo {
       @Bean
       Consumer<String> input() {
        return str -> {
            
            if (LOG.isInfoEnabled()) {
                LOG.info(context.toString());
                 }
            };
       }
    }
    

    检查 Consumer bean 替换了 @StreamListener@Input

    关于配置,如果之前为了配置,您的 application.yml 看起来像这样:

    spring:
      cloud:
        stream:
          bindings:
            input:
              destination: destination
              group: group
              consumer:
                concurrency: 10
                max-attempts: 3
    

    现在新的配置会是这样的

    spring:
      cloud:
        stream:
          bindings:
            input-in-0:
              destination: destination
              group: group
              consumer:
                concurrency: 10
                max-attempts: 3
    

    in 和out 对应于绑定的类型(例如输入或输出)。 index 是输入或输出绑定的索引。对于典型的单输入/输出函数,它始终为 0。

    现在让我们考虑输出通道:

        public interface OutputChannels {
            @Output
            MessageChannel output();
        }
    
        @Service
        @EnableBinding(OutputChannels.class)
        class PubSubSendQueue {
        
        OutputChannels  outputChannel;
    
        public void publish() {
            outputChannel.output().send("Hello");
        }
    }
    

    现在函数代码如下:

    @Service
    class PubSubSendQueue {
        @Bean
        public Supplier<String> output(){
            return Supplier { "Adam" }
        }
    }
    

    【讨论】:

    • 感谢您的示例,这对我帮助很大。我已经了解了该方法是如何被引用的,最后我发现 input-in-0: 输入实际上是方法名称。还将@Service 添加到消费者示例中,只是为了有一个工作示例。再次感谢!
    【解决方案2】:

    【讨论】:

      【解决方案3】:

      这里有一些更有用的信息:

      发送消息

      使用 org.springframework.cloud.stream.function.StreamBridge 发送消息。

      之前

      myDataSource.output().send(message);
      

      之后

      streamBridge.send("myData-out-0", message);
      

      更换 ServiceActivator

      之前

      @ServiceActivator(inputChannel = MyProcessor.INPUT, outputChannel = MyProcessor.OUTPUT)
      public Message<MySuperOutputMessage> transform(Message<MySuperInputMessage> message) { ... }
      

      之后

      @Bean
      Function<Message<MySuperInputMessage>, Message<MySuperOutputMessage>> myCoolFunction() {
          return message -> {...}; 
      }
      

      不要忘记在属性 spring.cloud.function.definition 中注册“myCoolFunction”。

      【讨论】: