【问题标题】:Configuring a Spring Integration aggregator to combine responses from a RabbitMq fanout exchange配置 Spring Integration 聚合器以组合来自 RabbitMq 扇出交换的响应
【发布时间】:2012-10-19 01:41:44
【问题描述】:

我正在尝试使用 Spring Integration 配置以下内容:

  1. 向频道发送消息。
  2. 将此消息发布到与 n 个消费者的兔子扇出 (pub/sub) 交换。
  3. 每个消费者都提供一条响应消息。
  4. 让 Spring Integration 在将这些响应返回给原始客户端之前对其进行聚合。

到目前为止,我遇到了一些问题......

  1. 我正在使用发布-订阅通道来设置apply-sequence="true" 属性,以便设置correlationId、sequenceSize 和sequenceNumber 属性。这些属性被DefaultAmqpHeaderMapper 丢弃。 DEBUG headerName=[correlationId] WILL NOT be mapped

  2. sequenceSize 属性仅设置为 1,即使在扇出交换中注册了 2 个队列。大概这意味着消息会过早地从聚合器中释放。我希望这是因为我滥用发布-订阅频道来使用apply-sequence="true",而且说只有一个订阅者int-amqp:outbound-gateway 是非常正确的。

我的出站 Spring 配置如下:

<int:publish-subscribe-channel id="output" apply-sequence="true"/>

<int:channel id="reply">
    <int:interceptors>
        <int:wire-tap channel="logger"/>
    </int:interceptors>
</int:channel>

<int:aggregator input-channel="reply" method="combine">
    <bean class="example.SimpleAggregator"/>
</int:aggregator>

<int:logging-channel-adapter id="logger" level="INFO"/>

<int:gateway id="senderGateway" service-interface="example.SenderGateway" default-request-channel="output" default-reply-channel="reply"/>

<int-amqp:outbound-gateway request-channel="output"
                                   amqp-template="amqpTemplate" exchange-name="fanout-exchange"
                                   reply-channel="reply"/>

我的 rabbitMQ 配置如下:

<rabbit:connection-factory id="connectionFactory" />

<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" reply-timeout="-1" />

<rabbit:admin connection-factory="connectionFactory" />

<rabbit:queue name="a-queue"/>
<rabbit:queue name="b-queue"/>

<rabbit:fanout-exchange name="fanout-exchange">
    <rabbit:bindings>
        <rabbit:binding queue="a-queue" />
        <rabbit:binding queue="b-queue" />
    </rabbit:bindings>
</rabbit:fanout-exchange>

消费者看起来像这样:

<int:channel id="input"/>

<int-amqp:inbound-gateway request-channel="input" queue-names="a-queue" connection-factory="connectionFactory" concurrent-consumers="1"/>

<bean id="listenerService" class="example.ListenerService"/>

<int:service-activator input-channel="input" ref="listenerService" method="receiveMessage"/>

任何建议都会很棒,我怀疑我在某处弄错了棍子……

基于 Gary 的 cmets 的新出站 spring 配置:

<int:channel id="output"/>

<int:header-enricher input-channel="output" output-channel="output">
    <int:correlation-id expression="headers['id']" />
</int:header-enricher>

<int:gateway id="senderGateway" service-interface="example.SenderGateway" default-request-channel="output" default-reply-timeout="5000" default-reply-channel="reply" />

<int-amqp:outbound-gateway request-channel="output"
                                   amqp-template="amqpTemplate" exchange-name="fanout-exchange"
                                   reply-channel="reply"
                                   mapped-reply-headers="amqp*,correlationId" mapped-request-headers="amqp*,correlationId"/>

<int:channel id="reply"/>

<int:aggregator input-channel="reply" output-channel="reply" method="combine" release-strategy-expression="size() == 2">
    <bean class="example.SimpleAggregator"/>
</int:aggregator>

【问题讨论】:

    标签: java rabbitmq spring-integration spring-amqp


    【解决方案1】:

    问题在于 S.I. 不知道扇出交换的拓扑结构。

    解决此问题的最简单方法是使用自定义发布策略

    release-strategy-expression="size() == 2"
    

    在聚合器上(假设扇出为 2)。因此,您不需要序列大小;您可以避免使用 header-enricher 来“滥用”发布/订阅频道...

        <int:header-enricher input-channel="foo" output-channel="bar">
            <int:correlation-id expression="T(java.util.UUID).randomUUID().toString()" />
        </int:header-enricher>
    

    您可以通过使用已经唯一的消息 id 来避免创建新的 UUID...

    <int:correlation-id expression="headers['id']" />
    

    最后,您可以通过添加将correlationId标头传递给AMQP

    mapped-request-headers="correlationId"
    

    到您的 amqp 端点。

    【讨论】:

    • 谢谢加里,这让我更进一步,现在的问题是我的出站网关似乎没有等待对消息的响应。消费者正在接收消息(来自 fanout 交换),我可以看到他们都在回复同一个 rabbitmq 队列(来自 DEBUG),但我没有收到发件人的回复。我必须将 amqp* 添加到 mapped-request-headers 属性中,否则标准 amqp 标头会丢失。
    • 我没有注意到您正在使用网关 - 网关只处理对请求的单个回复。您将需要使用出站适配器来发送请求并使用入站适配器来接收回复。您需要手动填充标头,以便 2 个消费者上的入站网关知道如何回复。
    【解决方案2】:

    即使这个问题已经 3 年了,我还是会回答它,因为我有同样的问题。

    Spring Integration 有一个 Scatter-Gather 的实现,听起来很像您原来的问题。

    这是来自Spring Documentation的相关部分

    它是一个复合端点,其目标是将消息发送到 收件人并汇总结果....

    以前,可以使用离散组件配置模式, 此增强带来了更方便的配置。

    ScatterGatherHandler 是一个请求-回复端点,它结合了一个 PublishSubscribeChannel(或 RecipientListRouter)和一个 聚合消息处理程序。请求消息被发送到分散器 通道和 ScatterGatherHandler 等待来自 聚合器发送到 outputChannel。

    【讨论】: