【问题标题】:Spring Integration DSL how to route split messages to different concurrent flows?Spring Integration DSL如何将拆分消息路由到不同的并发流?
【发布时间】:2020-11-24 15:28:54
【问题描述】:

我可能和其他人讨厌回答问题一样讨厌写菜鸟问题,但这里就是这样。

我需要根据负载中结果集的每一行中请求的操作,将从 JdbcPollingChannelAdapter 检索到的消息拆分为多条消息。

拆分操作很简单。被证明是一个挑战是有条件地将消息路由到一个流或另一个流。

经过多次尝试和错误,我相信这个流程代表了我的意图

                                                    /- insertUpdateAdapter -\
Poll Table -> decorate headers -> split -> router -<                         >- aggregator -> cleanup
                                                    \---- deleteAdapter ----/

为此,我构建了这个 Java DSL:

    final JdbcOutboundGateway inboundAdapter = createInboundAdapter();;
    final JdbcOutboundGateway deleteAdapter = createDeleteAdapter();
    final JdbcOutboundGateway insertUpdateAdapter = createInsertUpdateAdapter();
    
    return IntegrationFlows
            .from(setupAdapter,
                    c -> c.poller(Pollers.fixedRate(1000L, TimeUnit.MILLISECONDS).maxMessagesPerPoll(1)))
            .enrichHeaders(h -> h.headerExpression("start", "payload[0].get(\"start\")")
                    .headerExpression("end", "payload[0].get(\"end\")"))
            .handle(inboundAdapter)
            .split(insertDeleteSplitter)
            .enrichHeaders(h -> h.headerExpression("operation", "payload[0].get(\"operation\")"))
            .channel(c -> c.executor("stepTaskExecutor"))               
            .routeToRecipients (r -> r
                .recipientFlow("'I' == headers.operation or 'U' == headers.operation",
                            f -> f.handle(insertUpdateAdapter))

                // This element is complaining "Syntax error on token ")", ElidedSemicolonAndRightBrace expected"
                // Attempted to follow patterns from https://github.com/spring-projects/spring-integration-java-dsl/wiki/Spring-Integration-Java-DSL-Reference#routers
                .recipientFlow("'D' == headers.operation",
                            f -> f.handle(deleteAdapter))
                
                .defaultOutputToParentFlow())
                )
            .aggregate()
            .handle(cleanupAdapter)
            .get();

根据之前的工作,我做出的假设包括:

  1. 必要的频道会自动创建为直接频道
  2. Route To Recipients 是此功能的合适工具(我也考虑过表达式路由器,但如何添加子流的示例不如 Route To Recipients 清晰)

【问题讨论】:

    标签: spring spring-integration


    【解决方案1】:

    如果要并行运行拆分,请在拆分器和路由器之间的某处插入ExecutorChannel。您可以限制执行器的池大小来控制并发。

    【讨论】:

    • 我理解正确吗(参见编辑后的代码示例)?由于最终的 recipientFlow 元素上出现“ElidedSemicolonAndRightBrace expected”错误,我仍然无法编译。
    • .channel(MessageChannels.executor(stepTaskExecutor())。下一行有一个额外的)
    • 请注意,文档链接已过时,DSL 在 5.0 中已移至核心项目。 docs.spring.io/spring-integration/docs/current/reference/html/…
    【解决方案2】:

    .defaultOutputToParentFlow())后面多了一个括号

    更正后的代码是:

            return IntegrationFlows
                .from(setupAdapter,
                        c -> c.poller(Pollers.fixedRate(1000L, TimeUnit.MILLISECONDS).maxMessagesPerPoll(1)))
                .enrichHeaders(h -> h.headerExpression("ALC_startTime", "payload[0].get(\"ALC_startTime\")")
                        .headerExpression("ALC_endTime", "payload[0].get(\"ALC_endTime\")"))
                .handle(inboundAdapter)
                .split(insertDeleteSplitter)
                .enrichHeaders(h -> h.headerExpression("ALC_operation", "payload[0].get(\"ALC_operation\")"))
                .channel(c -> c.executor(stepTaskExecutor))
                .routeToRecipients (r -> r
                    .recipientFlow("'I' == headers.ALC_operation or 'U' == headers.ALC_operation",
                                f -> f.handle(insertUpdateAdapter))
    
                    // This element is complaining "Syntax error on token ")", ElidedSemicolonAndRightBrace expected"
                    // Attempted to follow patterns from https://github.com/spring-projects/spring-integration-java-dsl/wiki/Spring-Integration-Java-DSL-Reference#routers
                    .recipientFlow("'D' == headers.ALC_operation",
                                f -> f.handle(deleteAdapter))
                    .defaultOutputToParentFlow())
                .aggregate()
                .handle(cleanupAdapter)
                .get();
    

    【讨论】:

      猜你喜欢
      • 2015-06-17
      • 2017-12-23
      • 1970-01-01
      • 2018-04-15
      • 2021-10-03
      • 1970-01-01
      • 2023-04-09
      • 2016-01-27
      • 1970-01-01
      相关资源
      最近更新 更多