【问题标题】:How to implement HTTP request/reply when the response comes from a rabbitMQ reply queue using Spring Integration DSL?当响应来自使用 Spring Integration DSL 的 rabbitMQ 回复队列时,如何实现 HTTP 请求/回复?
【发布时间】:2021-05-29 05:40:21
【问题描述】:

我正在尝试在 Spring Integration DSL 中使用单独的 RabbitMQ 队列来实现 HTTP 请求/回复。它类似于Spring IntegrationFlow http request to amqp queue。不同之处在于我希望将响应返回给原始的 http 调用者。我可以看到测试 http post 消息成功传递到请求队列并转换(转换为大写)到响应队列。该消息也从响应队列中消耗,但从未返回给调用者(http://localhost:8080/Tunner)。最终调用超时,出现 500 错误。我是新手,所以可能有一些我完全错过的东西。有人可以提供建议吗?代码如下:

public class TunnelApplication
{
    public static void main(String[] args)
    {
        SpringApplication.run(TunnelApplication.class, args);
    }

    @Value("${outboundQueue}")
    private String outboundQueue;

    @Value("${inboundQueue}")
    private String inboundQueue;

    private ConnectionFactory rabbitConnectionFactory;

    @Autowired
    public TunnelApplication(ConnectionFactory factory) {
        rabbitConnectionFactory = factory;
    }

    @Bean
    public Queue targetQueue()
    {
        return new Queue(outboundQueue, true, false, true);
    }

    @Bean
    public Queue requestQueue()
    {
        return new Queue(inboundQueue, true, false, true);
    }

    @Bean
    public Jackson2JsonMessageConverter jsonMessageConverter()
    {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public AmqpTemplate amqpTemplate()
    {
        RabbitTemplate result = new RabbitTemplate(rabbitConnectionFactory);
        result.setMessageConverter(jsonMessageConverter());
        result.setDefaultReceiveQueue(outboundQueue);
        //result.setReplyAddress(outboundQueue);
        result.setReplyTimeout(60000);
        return result;
    }

    @Bean
    public IntegrationFlow sendReceiveFlow(RabbitTemplate amqpTemplate) {
        return IntegrationFlows
                .from(Http.inboundGateway("/tunnel"))
                .handle(Amqp.outboundGateway(amqpTemplate)
                        .routingKey(inboundQueue)
                        .returnChannel(amqpOutboundChannel()))
                .log()
                .bridge(null)
                .get();
    }

    @Bean
    public IntegrationFlow rabbitToWeb(RabbitTemplate amqpTemplate, ConnectionFactory connectionFactory) {
        return IntegrationFlows.from(Amqp.inboundGateway(connectionFactory, requestQueue()))
                .transform(String.class, String::toUpperCase)
                .log()
                .handle(Amqp.outboundGateway(amqpTemplate).routingKey(outboundQueue))
                .log()
                .bridge(null)
                .get();
    }

    @Bean
    public IntegrationFlow replyBackToHttp(RabbitTemplate amqpTemplate, ConnectionFactory connectionFactory) {
        return IntegrationFlows.from(Amqp.inboundGateway(connectionFactory, targetQueue()))
                .handle(Http.outboundGateway("http://localhost:8080/tunnel")
                       .expectedResponseType(String.class))
                .log()
                .bridge(null)
                .channel(amqpOutboundChannel())
                .get();
    }

    @Bean
    public MessageChannel amqpOutboundChannel() {
    return new DirectChannel();
}

我们也尝试了以下代码(由我的同事编写),但我们也没有得到响应:

@Configuration
@EnableIntegration
public class FlowConfig {

 

   @Value("${routingKey}")
   private String routingKey;

 

   @Value("${rabbitSinkChannel}")
   private String rabbitSinkChannel;

 

   @Bean
   public MessageChannel rabbitSinkChannel(ConnectionFactory connectionFactory) {
      return
         Amqp
         .channel(rabbitSinkChannel, connectionFactory)
         .get();
   }

 

   @Bean
   public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
      return new RabbitTemplate(connectionFactory);
   }

 

   @Bean
   public IntegrationFlow httpFlow(RabbitTemplate rabbitTemplate, ConnectionFactory connectionFactory) {
      MessageChannel rabbitSinkChannel = rabbitSinkChannel(connectionFactory);

 

      return IntegrationFlows
         .from(
            Http.inboundGateway("/sendreceive")
         )
         .handle(
            Amqp.outboundGateway(rabbitTemplate)
               .routingKey(routingKey)
               .returnChannel(rabbitSinkChannel)
         )
         .channel(rabbitSinkChannel) // or .handle? if so, what?

 

         .get();
   }
}

【问题讨论】:

    标签: rabbitmq spring-integration spring-cloud-stream spring-cloud-dataflow spring-integration-dsl


    【解决方案1】:

    您可能误解了Amqp.outboundGateway 上的returnChannel 是什么,并尝试依赖您的逻辑。请熟悉发布商确认和退货功能:https://docs.spring.io/spring-amqp/docs/current/reference/html/#cf-pub-conf-ret

    也不清楚replyBackToHttp 流的目的是什么,但目前它与对其他 bean 的混合引用混淆了。

    您可能需要分别调查 Spring AMQP 的请求-回复配置是什么,并且您可能不会尝试使用另一个队列进行回复。虽然还是可以的:见replyAddress属性或RabbitTemplatehttps://docs.spring.io/spring-amqp/docs/current/reference/html/#request-reply

    【讨论】:

    • 谢谢阿特姆!我们将检查您建议的文件。好吧,我们确实需要为此使用另一个队列。实际上,我们的初衷是使用 Spring Cloud 数据流来替换我们现有的需要请求/回复的 ESB SOAP 服务。然而,由于 SCDF 流是单向的,我们必须按照 Gary 在stackoverflow.com/questions/43307078/… 中的建议添加 Spring Integration 部分。不确定是否有其他方法可以实现此目的。
    • 正确。 Spring Cloud Stream 不公开请求-回复模式,因为它不适合流式架构。当然,作为 Spring Cloud Data Flow 处理器实现的一部分,您始终可以使用 Spring Integration 来满足您的要求。您可以针对固定回复队列使用 AMQP 出站网关(客户端),但在服务器端,您应确保在 AMQP 入站和出站通道适配器之间正确携带相关标头。
    • 嗨 Artem,我能够通过更新 amqpTemplate() 方法、添加一个 replyListenerContainer() 方法和删除 replyBackToHttp() 方法来使其工作。请参阅我发布的答案。谢谢你的帮助!下一步是设置 SCDF 流进行测试。
    【解决方案2】:

    以下更新有效(我还删除了 replyBackToHttp() 方法):

    @Bean
    public AmqpTemplate amqpTemplate()
    {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitConnectionFactory);
        rabbitTemplate.setMessageConverter(jsonMessageConverter());
        //result.setDefaultReceiveQueue(outboundQueue);
        rabbitTemplate.setReplyAddress(outboundQueue);
        rabbitTemplate.setReplyTimeout(60000);
        rabbitTemplate.setUseDirectReplyToContainer(false);
        return rabbitTemplate;
    }
    
    @Bean
    public SimpleMessageListenerContainer replyListenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(rabbitConnectionFactory);
        container.setQueues(replyQueue());
        container.setMessageListener((MessageListener) amqpTemplate());
        return container;
    }
    

    【讨论】:

      【解决方案3】:

      对于那些感到沮丧并只想继续前进的人来说,这是几乎完整的解决方案。

      package com.scdf.poc.config;
      import org.springframework.amqp.rabbit.connection.ConnectionFactory;
      import org.springframework.amqp.rabbit.core.RabbitTemplate;
      import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
      import org.springframework.beans.factory.annotation.Value;
      import org.springframework.context.annotation.Bean;
      import org.springframework.context.annotation.Configuration;
      import org.springframework.integration.amqp.dsl.Amqp;
      import org.springframework.integration.config.EnableIntegration;
      import org.springframework.integration.dsl.IntegrationFlow;
      import org.springframework.integration.dsl.IntegrationFlows;
      import org.springframework.integration.http.dsl.Http;
      @Configuration
      @EnableIntegration
      public class FlowConfig {
         @Value("${rabbitSource}")
         private String rabbitSource;
         @Value("${rabbitSink}")
         private String rabbitSink; // Note: Has to be manually created in rabbit mq, the SCDF flows won't auto create this
         @Bean
         public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            rabbitTemplate.setReplyAddress(rabbitSink);
            return rabbitTemplate;
         }
         @Bean
         public SimpleMessageListenerContainer simpleMessageListenerContainer(RabbitTemplate rabbitTemplate, ConnectionFactory connectionFactory) {
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
            container.setQueueNames(rabbitSink);
            container.setMessageListener(rabbitTemplate);
            return container;
         }
         @Bean
         public IntegrationFlow httpFlow(RabbitTemplate rabbitTemplate) {
            return IntegrationFlows
               .from(
                  Http.inboundGateway("/sendreceive")
                     .requestPayloadType(String.class)
               )
               .handle(
                  Amqp.outboundGateway(rabbitTemplate)
                     .routingKey(rabbitSource)
               )
               .get();
         }
      }
      

      application.properties - 注意 SCDF 使用流名称作为队列名称的前缀和后缀

      rabbitSource=pocStream.rabbitSource.pocStream
      rabbitSink=pocStream.rabbitSink.pocStream
      

      pocStream 的 SCDF 流定义 - 这只是回显请求

      rabbitSource: rabbit --queues=rabbitSource | bridge | rabbitSink: rabbit --routing-key=pocStream.rabbitSink.pocStream
      

      【讨论】:

        猜你喜欢
        • 2016-11-24
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2012-02-22
        • 2022-12-01
        • 2018-11-22
        相关资源
        最近更新 更多