【发布时间】:2020-11-24 15:28:54
【问题描述】:
我可能和其他人讨厌回答问题一样讨厌写菜鸟问题,但这里就是这样。
我需要根据负载中结果集的每一行中请求的操作,将从 JdbcPollingChannelAdapter 检索到的消息拆分为多条消息。
拆分操作很简单。被证明是一个挑战是有条件地将消息路由到一个流或另一个流。
经过多次尝试和错误,我相信这个流程代表了我的意图
/- insertUpdateAdapter -\
Poll Table -> decorate headers -> split -> router -< >- aggregator -> cleanup
\---- deleteAdapter ----/
为此,我构建了这个 Java DSL:
final JdbcOutboundGateway inboundAdapter = createInboundAdapter();;
final JdbcOutboundGateway deleteAdapter = createDeleteAdapter();
final JdbcOutboundGateway insertUpdateAdapter = createInsertUpdateAdapter();
return IntegrationFlows
.from(setupAdapter,
c -> c.poller(Pollers.fixedRate(1000L, TimeUnit.MILLISECONDS).maxMessagesPerPoll(1)))
.enrichHeaders(h -> h.headerExpression("start", "payload[0].get(\"start\")")
.headerExpression("end", "payload[0].get(\"end\")"))
.handle(inboundAdapter)
.split(insertDeleteSplitter)
.enrichHeaders(h -> h.headerExpression("operation", "payload[0].get(\"operation\")"))
.channel(c -> c.executor("stepTaskExecutor"))
.routeToRecipients (r -> r
.recipientFlow("'I' == headers.operation or 'U' == headers.operation",
f -> f.handle(insertUpdateAdapter))
// This element is complaining "Syntax error on token ")", ElidedSemicolonAndRightBrace expected"
// Attempted to follow patterns from https://github.com/spring-projects/spring-integration-java-dsl/wiki/Spring-Integration-Java-DSL-Reference#routers
.recipientFlow("'D' == headers.operation",
f -> f.handle(deleteAdapter))
.defaultOutputToParentFlow())
)
.aggregate()
.handle(cleanupAdapter)
.get();
根据之前的工作,我做出的假设包括:
- 必要的频道会自动创建为直接频道
- Route To Recipients 是此功能的合适工具(我也考虑过表达式路由器,但如何添加子流的示例不如 Route To Recipients 清晰)
【问题讨论】: