【问题标题】:Spring Integration - Gateway - Splitter - Aggregator with JMSSpring 集成 - 网关 - 拆分器 - 带有 JMS 的聚合器
【发布时间】:2013-06-13 23:54:13
【问题描述】:

我正在尝试使用 Spring 集成以由 JMS 支持的事件驱动方式执行 Gateway --> Splitter-->ServiceActivator --> Aggregator Pattern。我希望服务激活器是多线程的,并且任何端点都可以在集群上执行,而不必是原始服务器。我可以在不使用 JMS(使用 SI 通道)的情况下使其在单个 JVM 中工作,但我知道 SI 通道不会帮助我水平扩展,即多个 VM。

这是我目前的配置

    <int:gateway id="transactionGateway" default-reply-channel="transaction-reply"
    default-request-channel="transaction-request" default-reply-timeout="10000"
    service-interface="com.test.abc.integration.service.ProcessGateway">
</int:gateway>

<int-jms:outbound-gateway id="transactionJMSGateway"
    correlation-key="JMSCorrelationID" request-channel="transaction-request"
    request-destination="transactionInputQueue" reply-channel="transaction-reply"
    reply-destination="transactionOutputQueue" extract-reply-payload="true"
    extract-request-payload="true">
    <int-jms:reply-listener
        max-concurrent-consumers="20" receive-timeout="5000"
        max-messages-per-task="1" />
</int-jms:outbound-gateway>

<!-- Inbound Gateway for Splitter -->
<int-jms:inbound-gateway id="splitterGateWay"
    request-destination="transactionInputQueue" request-channel="splitter-input"
    reply-channel="splitter-output" concurrent-consumers="1"
    default-reply-destination="processInputQueue"
    max-concurrent-consumers="1" extract-reply-payload="true"
    correlation-key="JMSCorrelationID" extract-request-payload="true" />

<!-- Inbound Gateway Invokes Service Activator and Sends response back to 
    the channel -->
<int-jms:inbound-gateway id="seriveActivatorGateway"
    request-destination="processInputQueue" request-channel="process-input"
    reply-channel="process-output" concurrent-consumers="1"
    default-reply-destination="processOutputQueue"
    max-concurrent-consumers="1" extract-reply-payload="true"
    correlation-key="JMSCorrelationID" extract-request-payload="true"
    max-messages-per-task="1" />

<int-jms:inbound-gateway id="aggregatorGateway"
    request-destination="processOutputQueue" request-channel="aggregator-input"
    reply-channel="aggregator-output" concurrent-consumers="1"
    default-reply-destination="transactionOutputQueue"
    max-concurrent-consumers="1" extract-reply-payload="true"
    extract-request-payload="true" max-messages-per-task="1"
    correlation-key="JMSCorrelationID" />


<int:splitter id="transactionSplitter" input-channel="splitter-input"
    ref="processSplitter" output-channel="splitter-output">
</int:splitter>

<int:service-activator id="jbpmServiceActivator"
    input-channel="process-input" ref="jbpmService" requires-reply="true"
    output-channel="process-output">
</int:service-activator>

<int:aggregator id="transactionAggregator"
    input-channel="aggregator-input" method="aggregate" ref="processAggregator"
    output-channel="aggregator-output" message-store="processResultMessageStore"
    send-partial-result-on-expiry="false">
</int:aggregator>

在使用网关之前,我尝试使用 JMS 支持的通道,但这种方法也没有成功。我现在面临的问题是 Splitter 现在回复 transactionOutputQueue 。我尝试使用 jms:header-enricher 并没有取得多大成功。我觉得我解决问题/SI 的方法可能存在根本缺陷。非常感谢任何帮助/指导。

另外,在我上面提供的代码 sn-p 中,我使用了一个简单的内存聚合器,我知道如果我需要让它在整个集群中工作,我可能需要一个 JDBC 支持的聚合器,但就目前而言,我是试图让这种模式在单个 VM 上工作

这是根据 Gary 的评论更新的工作配置

<bean id="processOutputQueue" class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg value="test.com.abc.process.output" />
</bean>

<bean id="transactionOutputQueue" class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg value="test.com.abc.transaction.result" />
</bean>

<bean id="transactionInputQueue" class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg value="test.com.abc.transaction.input" />
</bean>

<int:gateway id="transactionGateway"
    default-request-channel="transaction-request" default-reply-timeout="10000"
    default-reply-channel="aggregator-output"
    service-interface="com.test.abc.integration.service.ProcessGateway">
</int:gateway>

<int:splitter id="transactionSplitter" input-channel="transaction-request"
    ref="processSplitter" output-channel="splitter-output">
</int:splitter>


<int-jms:outbound-gateway id="splitterJMSGateway"
    correlation-key="JMSCorrelationID" request-channel="splitter-output"
    request-destination="processInputQueue" reply-channel="aggregator-input"
    reply-destination="processOutputQueue" extract-request-payload="true"
    extract-reply-payload="true">
    <int-jms:reply-listener
        max-concurrent-consumers="20" receive-timeout="5000" />
</int-jms:outbound-gateway>

<!-- Inbound Gateway Invokes Service Activator and Sends response back to 
    the channel -->
<int-jms:inbound-gateway id="seriveActivatorGateway"
    request-destination="processInputQueue" request-channel="process-input"
    reply-channel="process-output" default-reply-destination="processOutputQueue"
    concurrent-consumers="5" max-concurrent-consumers="10"
    extract-reply-payload="true" correlation-key="JMSCorrelationID"
    extract-request-payload="true" max-messages-per-task="1" />

<int:service-activator id="jbpmServiceActivator"
    input-channel="process-input" ref="jbpmService" requires-reply="true"
    output-channel="process-output">
</int:service-activator>


<int:aggregator id="transactionAggregator"
    input-channel="aggregator-input" ref="processAggregator"
    output-channel="aggregator-output" message-store="processResultMessageStore"
    send-partial-result-on-expiry="false">
</int:aggregator>

<bean id="processResultMessageStore"
    class="org.springframework.integration.store.SimpleMessageStore" />
<bean id="processResultMessageStoreReaper"
    class="org.springframework.integration.store.MessageGroupStoreReaper">
    <property name="messageGroupStore" ref="processResultMessageStore" />
    <property name="timeout" value="5000" />
</bean>
<task:scheduled-tasks>
    <task:scheduled ref="processResultMessageStoreReaper"
        method="run" fixed-rate="1000" />
</task:scheduled-tasks>

<int:logging-channel-adapter id="logger"
    level="DEBUG" log-full-message="true" />

<int-stream:stdout-channel-adapter
    id="stdoutAdapter" channel="logger" />

我将 JMS 管道仅限于服务激活器,这是我最初想要的。

我基于上述方法的唯一问题是,即使我在多个 VMS 中使用它,我是否需要让我的聚合器由数据库支持(因为它前面的 JMS 网关确保它只接收具有有效相关 ID 的消息?)

问候,

【问题讨论】:

    标签: java spring-integration


    【解决方案1】:

    您可能不需要在每个组件之间使用 JMS。但是,我们有很多这样的链式网关的测试用例,并且一切正常。

    某些东西必须接线不正确。由于您没有展示您的完整配置,因此很难推测。

    请务必使用最新版本(2.2.4)并开启 DEBUG 日志记录并在流程中跟踪消息;只要您的消息有效负载可以跨 JMS 边界识别,就应该很容易找出哪里出了问题。

    【讨论】:

    • 感谢您的回复。我会提出建议。您是否碰巧知道由 JMS 支持的这种模式的任何示例。
    • 我通过将 JMS 队列仅保留给 Service Activator 来完成这项工作。但是,对于这种模式,我不确定是否应该使用数据库/缓存支持的聚合器(因为网关处理将消息响应与相同 ID 关联起来)。
    猜你喜欢
    • 1970-01-01
    • 2013-06-16
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-08-21
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多