【问题标题】:spring integration: multiple step multiple channel subscribersspring 集成:多步骤多渠道订阅者
【发布时间】:2016-01-12 19:00:15
【问题描述】:

我需要实现一个由多个步骤组成的集成流程,其中每个步骤都可以由可变数量的处理器(插件)执行。

到目前为止我所拥有的:

<!-- gateway -->
<int:gateway default-request-channel="step/1" service-interface="ServiceGateway">
    <int:method name="send" />
</int:gateway>


<!-- plugin 1 -->
<int:publish-subscribe-channel id="step/1" apply-sequence="true" />

<int:service-activator input-channel="step/1" output-channel="step/2">
    <bean class="Transformer" />
</int:service-activator>

<int:service-activator input-channel="step/1" output-channel="step/2">
    <bean class="Transformer2" />
</int:service-activator>


<!-- plugin 2 -->
<int:publish-subscribe-channel id="step/2" apply-sequence="true" />

<int:service-activator input-channel="step/2" output-channel="end">
    <bean class="Transformer3" />
</int:service-activator>

<int:service-activator input-channel="step/2" output-channel="end">
    <bean class="HttpTransformer4" />
</int:service-activator>

<!-- aggregation -->
<int:channel id="end" />
<int:aggregator input-channel="end" />

预期的行为如下:

  1. 通过网关发送第一个请求
  2. 输入由 2 个“step/1”插件处理
  3. “step/1”插件的每个输出都由“step/2”插件处理
  4. 聚合器应聚合 4 个项目 (1 -> 2 -> 4)

一切正常,但结果不是预期的,我只收到 2 个(随机)项目而不是 4 个。

我认为问题在于聚合器仅在两个项目后触发发布,因为“step/2”通道中的“apply-sequence”覆盖了“step/1”中的“apply-sequence”。所以问题是:如何让聚合器等待所有消息?

提前谢谢你。

自定义发布策略:

@SuppressWarnings("unchecked")
@Override
public boolean canRelease ( MessageGroup group ) {

    MessageHeaders headers = group.getOne ().getHeaders ();
    List<List<Object>> sequenceDetails = (List<List<Object>>) headers.get ( "sequenceDetails" );
    System.out.println ( sequenceDetails );

    int expectedSize = 1;
    //map message id, max group size reached (i.e. sequenceNumber==sequenceSize)
    for ( List<Object> sequenceItem : sequenceDetails ) {
        if ( sequenceItem.get ( 1 ) != sequenceItem.get ( 2 ) ) {
            System.err.println ( "--> AGG: no release check, group max not reached" );
            return false;
        }
        expectedSize *= (int) sequenceItem.get ( 2 );//multiplies the group sizes
    }

    int expectedSize2 = expectedSize * (int) headers.get ( "sequenceSize" );

    int currentSize = group.getMessages ().size () * expectedSize;
    System.err.println ( "--> AGG: " + expectedSize2 + " : " + currentSize );
    boolean canRelease = expectedSize2 == currentSize;
    if ( canRelease ) {
        System.out.println ( "ok" );
    }
    return canRelease;
}

打印出来:

[[7099b583-55d4-87d3-4502-993f05bfb388, 1, 2]]

--> AGG:没有发布检查,未达到组最大值

[[7099b583-55d4-87d3-4502-993f05bfb388, 1, 2]]

--> AGG:没有发布检查,未达到组最大值

[[7099b583-55d4-87d3-4502-993f05bfb388, 2, 2]]

--> AGG: 4 : 2

[[7099b583-55d4-87d3-4502-993f05bfb388, 2, 2]]

--> AGG: 4 : 4

聚合代码:

@Aggregator
public Object aggregate ( List<Message<?>> objects ) {

    List<Object> res = new ArrayList<> ();
    for ( Message<?> m : objects ) {
        res.add ( m.getPayload () );
        MessageHeaders headers2 = m.getHeaders ();
        System.out.println ( headers2.get ( "history" ) );
    }

    return res;
}

打印出来:

gateway2,core-channel,(inner bean)#57018165,async/step/1,core-channel,(inner bean)#57018165,async/step/2,core-channel,(inner bean)#57018165,结束2

gateway2,core-channel,(inner bean)#57018165,async/step/1,core-channel,(inner bean)#57018165,async/step/2,core-channel,(inner bean)#57018165,结束2

[102, 202] --> 最终结果列表,预计由4个项目组成

【问题讨论】:

    标签: spring-integration


    【解决方案1】:

    使用自定义发布策略。来自第一个 pubsub 的相关数据由第二个 pubsub 推送到 sequenceDetails 标头中的堆栈。

    编辑

    问题是有两个组;您需要关联初始的correlationId。这是一个纯 SpEL 解决方案;使用自定义关联/发布策略来确保数据符合预期可能更安全(并使用getOne() 而不是迭代器)...

    <int:aggregator input-channel="end2"
            correlation-strategy-expression=
               "headers['sequenceDetails'][0][0]"
            release-strategy-expression=
               "size() == iterator().next().headers['sequenceSize'] * iterator().next().headers['sequenceDetails'][0][2]" />
    

    【讨论】:

    • 嗨,Gary,我已经实现了自定义 ReleaseStrategy,现在在序列中正确数量的项目 (4) 处触发了发布。现在的问题是输出仍然由 2 个项目组成。实现自定义聚合器我已经看到,即使发布策略在 4 个项目上触发发布,聚合方法(在 4 条消息通过 canRelease 方法后正确调用)接收到 2 个项目的消息列表(两者中只有一个制作组)。
    • 这对我来说没有任何意义;如果发布策略看到一个包含 4 条消息的组,您将获得所有 4 条消息进行聚合。你的评论only one of the two released groups 让我很困惑;应该有一组有 4 条消息(sequenceSizesequenceNumber 标题将是伪造的)。你能显示你的ReleaseStrategy吗? (编辑您的问题,不要尝试将其粘贴到评论中)。
    • 我添加了发布策略和聚合两种方法
    • 这看起来不对 int currentSize = group.getMessages ().size () * expectedSize; - 我认为你只需要像 return group.size() == sequenceSize * nestedSequenceSize 这样的东西。
    • 我把测试代码贴在github
    猜你喜欢
    • 1970-01-01
    • 2019-10-04
    • 2012-09-16
    • 2015-11-02
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-01-23
    • 2013-09-25
    相关资源
    最近更新 更多