【问题标题】:Spring Integration Channel Chaining WeirdnessSpring集成通道链接怪异
【发布时间】:2015-11-16 15:40:17
【问题描述】:

我可能只是在这里遗漏了一些非常简单的东西(或滥用了一些东西),但我试图设置两个直接通道,以便一个流将一些数据依次传递给每个通道。所以使用 Spring Integration JAVA DSL 我有这样的事情(对于这个例子进行了显着简化):

   public static final String TEST_CHANNEL = "testGateway";
public static final String TEST_UPPER_CHANNEL = "testChannelUpper";
public static final String TEST_LOWER_CHANNEL = "testChannelLower";

@Bean(name = TEST_CHANNEL)
public MessageChannel testGatewayChannel() {
    return MessageChannels.direct(TEST_CHANNEL).get();
}

@Bean(name = TEST_UPPER_CHANNEL)
public MessageChannel testChannelUpperChannel() {
    return MessageChannels.direct(TEST_UPPER_CHANNEL).get();
}

@Bean(name = TEST_LOWER_CHANNEL)
public MessageChannel testChannelLowerChannel() {
    return MessageChannels.direct(TEST_LOWER_CHANNEL).get();
}


@Bean
public IntegrationFlow testFlow() {
    return IntegrationFlows
            .from(TEST_CHANNEL)
            .channel(TEST_UPPER_CHANNEL)
            .channel(TEST_LOWER_CHANNEL)
            .get();
}


@Bean
public IntegrationFlow testUpperFlow() {
    return IntegrationFlows
            .from(TEST_UPPER_CHANNEL)
            .<String, String>transform(String::toUpperCase)
            .handle(System.out::println)
            .get();
}

@Bean
public IntegrationFlow testLowerFlow() {
    return IntegrationFlows
            .from(TEST_LOWER_CHANNEL)
            .<String, String>transform(String::toLowerCase)
            .handle(System.out::println)
            .get();
}

我正在使用 REST 端点通过网关调用流,但是当我这样做时,似乎只调用了一个通道。通道似乎在调用中也是随机的(有时会转到 testChannelUpper,有时会转到 testChannelLower)。

我基本上在执行过程中都遇到了这个问题: (每次我只是点击这个端点http://localhost:9090/test?test=HellOoi

执行 1: GenericMessage [payload=HELLOOI, headers={jobName=someActivity, history=someGateway,testGateway,testChannelUpper,testUpperFlow.channel#0, id=4aa7b075-23cc-6ab3-10a1-c7cb73bae49b, timestamp=1447686848477} ]

执行 2: GenericMessage [payload=HELLOOI, headers={jobName=someActivity, history=someGateway,testGateway,testChannelUpper,testUpperFlow.channel#0, id=a18dcd01-da18-b00d-30c0-e1a03ce19104, timestamp=1447686853549} ]

执行 3: GenericMessage [payload=hellooi, headers={jobName=someActivity, history=someGateway,testGateway,testChannelUpper,testLowerFlow.channel#0, id=5f0abcb9-378e-7a3c-9c93-a04ff6352927, timestamp=1447686857545} ]

我相信我在这里尝试的内容也显示在 DSL wiki 的 channelFlow 示例中: https://github.com/spring-projects/spring-integration-java-dsl/wiki/Spring-Integration-Java-DSL-Reference

Sooo 我正在使用的规格是:

Spring Boot v1.2.2.RELEASE

Spring v4.1.5.RELEASE

spring-integration-java-dsl 1.0.2.RELEASE

JDK 1.8.0_40-b25

那么...有其他人见过这种行为吗?我只是在滥用渠道实施吗?还有其他想法吗?提前致谢!


正如 Gary 指出的那样,最好的方法是拥有一个 pub-sub 并在此命令消费者:

    @Bean(name = TEST_CHANNEL)
public MessageChannel testGatewayChannel() {
    return MessageChannels.publishSubscribe(TEST_CHANNEL).get();
}


@Bean
public IntegrationFlow testUpperFlow() {
    return IntegrationFlows
            .from(TEST_CHANNEL)
            .<String, String>transform(String::toUpperCase, e -> e.order(1))
            .handle(System.out::println)
            .get();
}

@Bean
public IntegrationFlow testLowerFlow() {
    return IntegrationFlows
            .from(TEST_CHANNEL)
            .<String, String>transform(String::toLowerCase, e -> e.order(2))
            .handle(System.out::println)
            .get();
}

【问题讨论】:

    标签: java spring spring-boot spring-integration


    【解决方案1】:

    这样做的目的是什么……

    @Bean
    public IntegrationFlow testFlow() {
        return IntegrationFlows
            .from(TEST_CHANNEL).fixedSubscriberChannel()
            .channel(TEST_UPPER_CHANNEL)
            .channel(TEST_LOWER_CHANNEL)
            .get();
    }
    

    ?

    所做的只是将三个渠道连接在一起。

    事实上,您最终在 TEST_UPPER_CHANNEL 上有 2 个消费者 - 此流程中的桥接器和其他流程中的转换器。

    默认情况下,直接渠道中的调度使用循环分发。因此,第一条消息将发送到桥接器,然后发送到变压器,等等。

    【讨论】:

    • Hey Gary... 我的目的是让消息传送到两个频道 (TEST_CHANNEL -> TEST_UPPER_CHANNEL -> TEST_LOWER_CHANNEL)。我不确定我是否将问题与 fixedSubscriberChannel 混淆了,但这不是必需的,我将其从帖子中删除。
    • 那是不对的;您可以简单地将TEST_CHANNEL 更改为MessageChannels.publishSubscribe().get(或return new PublishSubscribeChannel());然后只需将每个流更改为from(TEST_CHANNEL);消除testFlow()
    • 是的。我知道你在做什么,但我希望这些频道按顺序执行。我对多通道设置工作方式的假设显然是错误的。 :( 不过我会弄清楚的。感谢您的意见!
    • 它们会按顺序执行——订阅顺序;您可以通过向处理程序端点添加 order 属性来使其明确... .&lt;String, String&gt;transform(String::toUpperCase, e -&gt; e.order(1)), ... .&lt;String, String&gt;transform(String::toLowerCase, e -&gt; e.order(2))
    猜你喜欢
    • 2015-06-03
    • 1970-01-01
    • 1970-01-01
    • 2012-10-29
    • 1970-01-01
    • 1970-01-01
    • 2016-06-02
    • 2017-06-22
    • 2020-01-12
    相关资源
    最近更新 更多