【问题标题】:Using Spring Integration with RabbitMQ使用 Spring 与 RabbitMQ 集成
【发布时间】:2012-05-08 06:42:23
【问题描述】:

我正在为我们的一个应用程序开发消息传递接口。应用程序是一种服务,旨在接受“作业”,进行一些处理并返回结果(实际上是以文件的形式)。

这个想法是使用 RabbitMQ 作为消息传递基础设施和 Spring AMQP 来处理协议特定的细节。

我不想让我的代码与 Spring AMQP 紧密耦合,所以我想使用 Spring Integration 来隐藏消息传递 API。所以基本上我想要这个:

消息发送到RabbitMQ ====> Spring AMQP ====> Spring Integration ====> MyService ====> 一路回复RabbitMQ

我正在尝试制定将其连接在一起所需的 XML 配置,但我遇到了多个抽象级别和不同术语的问题。事实证明,在 Spring AMQP/RabbitMQ 之上找到一个演示 Spring Integration 的工作示例非常困难,尽管这种设置对我来说感觉非常“最佳实践”。

1) 所以.. 有没有聪明的人可以快速浏览一下这个,或许可以把我推向正确的方向?我需要什么,不需要什么? :-)

2) 理想情况下,队列应该是多线程的,这意味着 taskExecutor 应该将多条消息传递给我的 jobService 以进行并行处理。需要什么配置?

 <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:util="http://www.springframework.org/schema/util"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:rabbit="http://www.springframework.org/schema/rabbit"
    xmlns:int="http://www.springframework.org/schema/integration"
    xmlns:int-amqp="http://www.springframework.org/schema/integration/amqp"
    xmlns:int-stream="http://www.springframework.org/schema/integration/stream"
    xsi:schemaLocation="
    http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
    http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd
    http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
    http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/amqp http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd
    http://www.springframework.org/schema/integration/stream http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd
    ">

    <context:component-scan base-package="com.myprogram.etc" />

    <!-- Messaging infrastructure: RabbitMQ -->

    <bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
        <constructor-arg value="${ei.messaging.amqp.servername}" />
        <property name="username" value="${ei.messaging.amqp.username}" />
        <property name="password" value="${ei.messaging.amqp.password}" />
    </bean>

    <rabbit:connection-factory id="connectionFactory" />

    <rabbit:admin connection-factory="connectionFactory"/>

    <!-- From RabbitMQ -->

    <int-amqp:inbound-gateway request-channel="fromAMQP" reply-channel="toAMQP" queue-names="our-product-name-queue" connection-factory="connectionFactory"/>

    <!-- Spring Integration configuration -->

    <int:channel id="fromAMQP">
        <!-- Is this necessary?? -->
        <int:queue/>
    </int:channel>

    <!-- JobService is a @Service with a @ServiceActivator annotation -->
    <int:service-activator input-channel="fromAMQP" ref="jobService"/>
</beans>

【问题讨论】:

    标签: rabbitmq amqp spring-integration


    【解决方案1】:

    我怀疑,我和你一样是 spring-integration 和 spring-integration-amqp 的菜鸟,但我确实得到了部分基于一个示例项目的工作。

    对于 rabbitmq 基础设施,我有以下几点:

    <rabbit:connection-factory id="rabbitConnectionFactory"/>
    
    <rabbit:template id="amqpTemplate" connection-factory="rabbitConnectionFactory"/>
    
    <rabbit:admin connection-factory="rabbitConnectionFactory"/>
    
    <!-- some attributes seemed to be ok with queue name, others required id
      -- so I used both with the same value -->
    <rabbit:queue id='test.queue' name='test.queue'/>
    
    <rabbit:direct-exchange name:"my.exchange">
        <rabbit:bindings>
            <rabbit:binding queue="test.queue" key="test.binding"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>
    

    要向rabbitmq发送消息,我有以下内容:

    <!-- This is just an interface definition, no implementation required
      -- spring will generate an implementation which puts a message on the channel -->
    <int:gateway id="backgroundService", 
             service-interface="com.company.BackgroundService"
                 default-request-channel="toRabbit"
    
    <int:channel id:"toRabbit"/>
    
    <!-- used amqpTemplate to send messages on toRabbit channel to rabbitmq -->
    <int-amqp:outbound-channel-adapter channel:"toRabbit" 
                                   amqp-template="amqpTemplate" 
                       exchange-name="my.exchange" 
                       routing-key="test.binding"/>
    

    要接收消息,我有以下内容:

    <int:service-activator input-channel="fromRabbit" 
                           ref="testService" 
                           method="serviceMethod"/>
    
    
    // from rabbitmq to local channel
    <int-amqp:inbound-channel-adapter channel="fromRabbit" 
                                      queue-names="test.queue" 
                                      connection-factory="rabbitConnectionFactory"/>
    
    <int:channel id="fromRabbit"/>
    

    一些警告 - spring-integration 中 amqp 集成的文档说可以同步发送和接收返回值,但我还没有弄清楚。当我的 service-activator 方法返回一个值时,它会引发异常,将消息放回rabbitmq(并生成一个无限循环,因为它会再次接收消息并再次引发异常)。

    我的 BackgroundService 界面如下所示:

    package com.company
    
    import org.springframework.integration.annotation.Gateway
    
    public interface BackgroundService {
    
        //@Gateway(requestChannel="someOtherMessageChannel")
        public String sayHello(String toWho)
    
    }
    

    如果您不想使用 spring bean 中配置的默认通道,您可以通过注解在每个方法上指定一个通道。

    附加到服务激活器的服务如下所示:

    package com.company;
    
    class TestService {
    
        public void serviceMethod(String param) {
        log.info("serviceMethod received: " + param");
        //return "hello, " + param;
        }
    }
    

    当我在不涉及rabbitmq 的情况下将所有东西都连接到本地时,调用者正确接收了返回值。当我去rabbitmq频道时,当返回一个值后抛出异常时,我得到了前面提到的无限循环。这肯定是可能的,否则如果不修改代码就不可能在不同的通道中连接,但我不确定诀窍是什么。如果您弄清楚了,请回复解决方案。显然,您可以根据需要在端点之间放置您喜欢的任何路由、转换和过滤。

    如果我上面的 XML 摘录中有错别字,请不要感到惊讶。我必须从 groovy DSL 转换回 xml,所以我可能会犯错误。但意图应该足够清楚。

    【讨论】:

    • 我注意到咖啡馆样本也有一个启用 amqp 的配置,并从中找出了双向(返回值)通信。不要配置入站通道适配器和出站通道适配器,而是配置 int-amqp:inbound-gateway 和 int-amqp:outbound-gateway。您可以只更改标记名称并保留我上面示例中的所有属性(但将“通道”重命名为“请求通道”)。您显然可以指定一个特定的回复通道而不是使用默认通道,但是当我尝试这样做时出现错误。
    • 嗨,你能做到吗?我有类似的问题stackoverflow.com/questions/24037271/…
    猜你喜欢
    • 2015-12-24
    • 1970-01-01
    • 2012-04-22
    • 2019-08-16
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-04-24
    相关资源
    最近更新 更多