【问题标题】:Spring Integration - Who exactly "iterates" the splitted messages from a splitter?Spring Integration - 谁从拆分器中“迭代”拆分的消息?
【发布时间】:2016-01-19 20:10:01
【问题描述】:

关于 SI 中的 splitter 元素,我有几个基本问​​题。

我知道要形成自定义的拆分器逻辑,我们需要扩展 AbstractMessageSplitter 并覆盖 splitMessage 方法。

集合随后将显示在splitteroutput-channel 上(假设没有为传入消息配置回复通道)。

splitteroutput-channel 可以随后变为 input-channel 用于另一个 si 组件。假设该组件是一个简单的service-activator

手册说集合的“每个”消息将由该集合呈现给的下游配置处理。因此,在我们的示例中,service-activator 将处理“每个”消息。

1) 我的查询是“”准确地迭代集合并在输入通道上显示来自集合的“每个”消息服务激活器?

2) 假设我们在这里涉及所有直接渠道。现在如果service-activator 的output-channel 被配置为nullChannel,一旦成功处理第一条消息,它是否仍会从集合中显示“next”消息?

3) 假设这里没有aggregator。还让我们假设所有涉及的方法都有非 void 返回。 向拆分器的 output-channel 呈现“collection”消息的调用者线程将返回什么?它会返回服务激活器返回的返回类型的集合吗?还是只会从集合中返回最后处理的消息?

我希望我的问题足够详细,以便澄清。

非常感谢您的回复。

最好的问候

【问题讨论】:

    标签: null iteration spring-integration channel splitter


    【解决方案1】:

    是的。你问的问题很好!

    1)AbstractMessageProducingHandler:

    protected void sendOutputs(Object result, Message<?> requestMessage) {
        if (result instanceof Iterable<?> && shouldSplitOutput((Iterable<?>) result)) {
            for (Object o : (Iterable<?>) result) {
                this.produceOutput(o, requestMessage);
            }
        }
        else if (result != null) {
            this.produceOutput(result, requestMessage);
        }
    }
    

    正是从这里每个项目被发送到output-channel

    2) 是的,如果所有内容都在DirectChannel 上,则下一条消息将在上一条完成之前不会发送。实际上它的工作方式与原始 java 中的相同:

     for (Object o : collection) {
    
     }
    

    注意:在这种sync 的情况下,如果当前消息处理失败,则不会发送下一个消息。因为Exception。与原始 Java 完全一样 :-)。

    3) 好吧。使用原始 Java,它可能看起来像:

    split(Collection<?>, Future<?>);  
     --->>
     for (Object o : collection) {
         process(Object, Future<?>)
     }
    

    这个过程正是在splitter 中完成的,甚至在每个 EIP 组件中都做得更好。实际上它们都只在 push 模式下工作。即使它对回复做了一些事情,它实际上也只是推送到replyChannel

    因此,如果您不关心回复过程,则只有第一条消息会填充对Future&lt;?&gt; 的回复——在我们的例子中,回复是发送给replyChannel。是的,所有其他人都可以这样做,但是回复将被忽略。请参阅TemporaryReplyChannel 源代码。

    注意,如果您的下游拆分器进程是async,例如ExecutorChannel 作为output-channel 上的&lt;splitter&gt; 没有预测哪条消息将在回复中有罪。因此,为了让您最满意地发送 Thread,您应该围绕这些拆分消息找出一些足够的算法。甚至可能使用&lt;aggregator&gt;

    我认为您最后一个问题的答案取决于目标业务逻辑要求。

    【讨论】:

    • 我的业务用例是我创建了一个以服务激活器开头的 SI 组件的“util-kind-of”配置。当一条消息被提交给这个服务激活器的输入通道时,它通过组件的下游处理消息,然后最终将消息有效负载存放到一个 MQ 上。这种“util-kind-of”配置的结束通道是 nullChannel。我的上游配置正在生成一个消息列表。我的要求是通过“util-kind-of”配置按顺序处理这些消息。
    • 我打算在上游配置和“util-kind-of”配置之间使用拆分器,但我不确定 1)如何确保处理顺序(比如说“自然”顺序;意思是与在列表中插入消息的顺序相同;这将取决于我正在使用的列表的实现吗?2)由于“util-kind-of”配置的终点是 nullChannel,我不是确定控件是否会返回调用者线程来处理列表中的下一条消息。
    • 对于我的用例,我不需要聚合器。我只需要拆分消息,按顺序处理它们。
    • 如果您的有效负载已经是一个集合,它会按原样迭代而无需修改。 nullChannel 只是假设与单向出站通道适配器相同。只需找出没有 SI 的相同过程!似曾相识,但这是事实,即使 SI 也受到其语言规则的限制,甚至更多:与语言中类似组件的工作方式相同
    • 所以如果我从“util-kind-of”配置的最后一个元素中删除 output-channel="nullChannel" 并且因为我不关心这个“ util-kind-of”配置,这对我的用例有用吗?或者它会导致任何线程被占用(对于同步情况)?
    猜你喜欢
    • 1970-01-01
    • 2023-04-09
    • 2011-10-21
    • 1970-01-01
    • 1970-01-01
    • 2020-03-27
    • 1970-01-01
    • 1970-01-01
    • 2022-01-21
    相关资源
    最近更新 更多