【问题标题】:Spring Integration Aggregator is sending expired/timed out messages to the wrong channel in a specific scenarioSpring Integration Aggregator 在特定场景中将过期/超时消息发送到错误的通道
【发布时间】:2024-01-18 07:42:01
【问题描述】:

我将消息发送到聚合器的输入通道,然后聚合器将聚合的消息释放到输出通道。聚合器至少需要 2 条消息(用于聚合),否则等待 10 秒以等待超时。我也在使用 jdbc 消息存储。

以下是我测试过的场景。

场景 1 运行良好

发送消息 1 和 2 -> 输入通道 (input1) -> 聚合器 1 -> 输出通道 (output1)

场景 2 运行良好

发送消息 1 和 2 -> 输入通道 (input2) -> 聚合器 2 -> 输出通道 (output2)

场景 3 运行良好

仅发送消息 1 -> 输入通道 (input1) -> 聚合器 1 -> 输出通道 (output1)

场景 4 失败,因为它不是将过期消息发送到 output2,而是发送到 output1

仅发送消息 1 -> 输入通道 (input2) -> 聚合器 2 -> 输出通道 (output1)

谁能提出方案 4 失败的原因?

以下是我的配置

<int:service-activator ref="activator" method="output1_activator" input-channel="output1" /> 

<int:service-activator ref="activator" method="output2_activator" input-channel="output2" /> 

<int:aggregator input-channel="input1" 
    output-channel="output1" 
    ref="waiter" 
    expire-groups-upon-completion="true" 
    send-partial-result-on-expiry="true" 
    message-store="myJdbcMessageStore" /> 

<int:aggregator input-channel="input2" 
    output-channel="output2" 
    ref="waiter" 
    expire-groups-upon-completion="true" 
    send-partial-result-on-expiry="true" 
    message-store="myJdbcMessageStore" />

<bean id="aggregatorJdbcDataSource" class="o.s.j.d.DriverManagerDataSource"> ..... </bean> 

<bean id="myJdbcMessageStore" class="org.springframework.integration.jdbc.JdbcMessageStore"> 
    <constructor-arg index="0" ref="aggregatorJdbcDataSource" /> 
</bean> 

<bean id="telMessageStoreReaper" class="org.springframework.integration.store.MessageGroupStoreReaper">     <property name="messageGroupStore" ref="myJdbcMessageStore" /> 
    <property name="timeout" value="10000" /> 
</bean> 

<task:scheduled-tasks> 
    <task:scheduled ref="telMessageStoreReaper" method="run" fixed-rate="5000" /> 
</task:scheduled-tasks>

【问题讨论】:

  • 你需要展示你的配置。
  • 您需要使用格式正确的配置编辑您的问题并删除这些 cmets;这种格式不可读。
  • 感谢 Gary,现在它按预期工作了。

标签: spring-integration message store aggregator


【解决方案1】:

您不能为两个聚合器使用相同的消息存储实例。收割者不知道哪个聚合器拥有该组。

您可以使用相同的表,但需要单独的消息存储实例;见partitioning a message store

你需要两家商店,每家都有不同的地区。

您也可以考虑使用group-timeout 代替收割者。

请将您的配置从 cmets 移至主要问题,以便其他人更容易阅读。

【讨论】:

    最近更新 更多