【问题标题】:Configure Spring Integration Aggregator via Java DSL under JDK 7在 JDK 7 下通过 Java DSL 配置 Spring Integration Aggregator
【发布时间】:2017-07-17 07:16:19
【问题描述】:

这是我第一次在 Java 7 下通过 DSL 配置 Spring Integration。众所周知,Lambda 表达式仅在 Java 8 下有效。所以我参考示例Spring Integration Java DSLSpring Integration Java DSL (pre Java 8): Line by line tutorial 进行如下配置收集每 100 条相同资源的消息发送到远程 RESTful 服务。

@Bean
public IntegrationFlow rawDataParsingAndSendingFlow(@Autowired HttpRequestExecutingMessageHandler httpOutboundAdapter,
                                          @Autowired @Qualifier("headerEnricher") HeaderEnricher headerEnricher) {

    return IntegrationFlows.from("rawStringParsingRequestChannel")
                           .transform(new RawStringToCheckDataMessageTransformer())
                           .transform(new DataMessageToDtoTransformer())
                           .aggregate(new Consumer<AggregatorSpec>(){

                                @Override public void accept(AggregatorSpec aggregatorSpec) {
                                    aggregatorSpec.processor(new SimpleMessageGroupProcessor(), null)
                                                  .correlationStrategy(new HeaderAttributeCorrelationStrategy("resource"))
                                                  .releaseStrategy(new MessageCountReleaseStrategy(100))
                                                  .sendPartialResultOnExpiry(true)
                                                  .groupTimeoutExpression("60000") ;
                                }
                           })
                           .transform(headerEnricher)
                           .transform(new ObjectToJsonTransformer())
                           .handle(httpOutboundAdapter)
                           .get();
}

但是,配置对我不起作用,它会引发如下异常。

Exception in thread "main" java.lang.IllegalStateException: Failed to process message list
    at org.springframework.integration.aggregator.MethodInvokingMessageListProcessor.process(MethodInvokingMessageListProcessor.java:79)
    at org.springframework.integration.aggregator.MethodInvokingMessageGroupProcessor.aggregatePayloads(MethodInvokingMessageGroupProcessor.java:86)
    at org.springframework.integration.aggregator.AbstractAggregatingMessageGroupProcessor.processMessageGroup(AbstractAggregatingMessageGroupProcessor.java:84)
    at org.springframework.integration.dsl.AggregatorSpec$MessageGroupProcessorWrapper.processMessageGroup(AggregatorSpec.java:127)
    at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.completeGroup(AbstractCorrelatingMessageHandler.java:665)
    at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.handleMessageInternal(AbstractCorrelatingMessageHandler.java:418)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:358)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:269)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:186)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:358)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:269)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:186)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:143)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:135)
    at org.springframework.integration.gateway.MessagingGatewaySupport.send(MessagingGatewaySupport.java:392)
    at org.springframework.integration.gateway.GatewayProxyFactoryBean.invokeGatewayMethod(GatewayProxyFactoryBean.java:477)
    at org.springframework.integration.gateway.GatewayProxyFactoryBean.doInvoke(GatewayProxyFactoryBean.java:429)
    at org.springframework.integration.gateway.GatewayProxyFactoryBean.invoke(GatewayProxyFactoryBean.java:420)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:213)
    at com.sun.proxy.$Proxy45.sendRawData(Unknown Source)
    at prototype.healthcloud.historic.data.pusher.HistoricDataRetriever$1.extractData(HistoricDataRetriever.java:82)
    at prototype.healthcloud.historic.data.pusher.HistoricDataRetriever$1.extractData(HistoricDataRetriever.java:68)
    at org.springframework.jdbc.core.JdbcTemplate$1.doInPreparedStatement(JdbcTemplate.java:697)
    at org.springframework.jdbc.core.JdbcTemplate.execute(JdbcTemplate.java:633)
    at org.springframework.jdbc.core.JdbcTemplate.query(JdbcTemplate.java:684)
    at org.springframework.jdbc.core.JdbcTemplate.query(JdbcTemplate.java:716)
    at org.springframework.jdbc.core.JdbcTemplate.query(JdbcTemplate.java:726)
    at prototype.healthcloud.historic.data.pusher.HistoricDataRetriever.retrieveHistoricData(HistoricDataRetriever.java:92)
    at prototype.healthcloud.historic.data.pusher.Application.main(Application.java:119)
Caused by: org.springframework.expression.AccessException: Unable to access property 'payload' through getter method
    at org.springframework.expression.spel.support.ReflectivePropertyAccessor$OptimalPropertyAccessor.read(ReflectivePropertyAccessor.java:640)
    at org.springframework.expression.spel.ast.PropertyOrFieldReference.readProperty(PropertyOrFieldReference.java:211)
    at org.springframework.expression.spel.ast.PropertyOrFieldReference.getValueInternal(PropertyOrFieldReference.java:94)
    at org.springframework.expression.spel.ast.PropertyOrFieldReference.getValueInternal(PropertyOrFieldReference.java:81)
    at org.springframework.expression.spel.ast.MethodReference.getArguments(MethodReference.java:154)
    at org.springframework.expression.spel.ast.MethodReference.getValueRef(MethodReference.java:71)
    at org.springframework.expression.spel.ast.CompoundExpression.getValueRef(CompoundExpression.java:66)
    at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:87)
    at org.springframework.expression.spel.ast.SpelNodeImpl.getTypedValue(SpelNodeImpl.java:131)
    at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:330)
    at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:169)
    at org.springframework.integration.util.MessagingMethodInvokerHelper.processInternal(MessagingMethodInvokerHelper.java:319)
    at org.springframework.integration.util.MessagingMethodInvokerHelper.process(MessagingMethodInvokerHelper.java:160)
    at org.springframework.integration.aggregator.MethodInvokingMessageListProcessor.process(MethodInvokingMessageListProcessor.java:73)
    ... 61 more
Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.springframework.expression.spel.support.ReflectivePropertyAccessor$OptimalPropertyAccessor.read(ReflectivePropertyAccessor.java:636)
    ... 74 more
Caused by: java.lang.IllegalStateException: Invalid method parameter for payload: was expecting collection.
    at org.springframework.util.Assert.state(Assert.java:70)
    at org.springframework.integration.util.MessagingMethodInvokerHelper$ParametersWrapper.getPayload(MessagingMethodInvokerHelper.java:920)
    ... 79 more

根本原因在 o.s.i.u.MessagingMethodInvokerHelper$HandlerMethod 方法 generateExpression,annotationType 为 NULL 且 parameterType o.s.i.s.MessageGroup 既不是CollectionCollection&lt;Message&lt;?&gt;&gt; 的子接口也不是数组,所以表达式会设置为'#target .processMessageGroup(有效负载)'。我想必须添加额外的逻辑块才能正确处理 MessageGroup 类型(不确定)。

由于我的聚合逻辑非常简单,我找到了一个解决方案,方法是指定 outputExpression,如下所示。

@Bean
public IntegrationFlow rawDataParsingAndSendingFlow(@Autowired HttpRequestExecutingMessageHandler httpOutboundAdapter,
                                          @Autowired @Qualifier("headerEnricher") HeaderEnricher headerEnricher) {

    return IntegrationFlows.from("rawStringParsingRequestChannel")
                           .transform(new RawStringToCheckDataMessageTransformer())
                           .transform(new DataMessageToDtoTransformer())
                           .aggregate(new Consumer<AggregatorSpec>(){

                                @Override public void accept(AggregatorSpec aggregatorSpec) {
                                    aggregatorSpec.outputExpression("#this.![payload]")
                                                  .correlationStrategy(new HeaderAttributeCorrelationStrategy("resource"))
                                                  .releaseStrategy(new MessageCountReleaseStrategy(100))
                                                  .sendPartialResultOnExpiry(true)
                                                  .groupTimeoutExpression("60000") ;
                                }
                           })
                           .transform(headerEnricher)
                           .transform(new ObjectToJsonTransformer())
                           .handle(httpOutboundAdapter)
                           .get();
}

到目前为止,变通解决方案对我有用,但我的问题是如果聚合逻辑很复杂,如何配置处理器。

【问题讨论】:

  • Spring Integration的版本是4.3.10.RELEASE

标签: spring-integration java-7 spring-integration-dsl


【解决方案1】:

aggregatorSpec.processor(new SimpleMessageGroupProcessor(), null)

您不能在该方法中使用具体的MessageGroupProcessor;它需要一个 POJO bean 和方法名称(如果 bean 上只有一个符合条件的方法,则可以为 null)。

使用

aggregatorSpec.outputProcessor(new SimpleMessageGroupProcessor())

请注意,该处理器的输出将是消息组,这可能不是您想要的。

您可能需要考虑使用DefaultAggregatingMessageGroupProcessor(如果您不提供outputProcessor,这是默认设置)。

【讨论】:

  • 没有方法名称的 SimpleMessageGroupProcessor 和 DefaultAggregatingMessageGroupProcessor 都不适合我,并且会引发相同的异常。
  • 你是对的,POJO 聚合器工作正常,虽然它看起来有点像 public class PayloadExtractingAggregator {@Aggregator public &lt;T extends AbstractDtoBase&gt; List&lt;T&gt; extract(List&lt;T&gt; dtos) {return dots;}} 和 DSL 是 aggregatorSpec.processor(payloadExtractingAggregator(), "extract")
  • 不要把代码放在cmets中;它不是很可读;改为编辑问题(或答案)。也许您误读了我的答案?当使用具体的MessageGroupProcessor 时,方法是.outputProcessor(...) 而不是.processor(...)
  • 抱歉错过了您的观点。现在我很清楚了。非常感谢。
最近更新 更多