【问题标题】:Spring XD filtering inside a processor module处理器模块内的 Spring XD 过滤
【发布时间】:2016-05-05 09:54:05
【问题描述】:

我想创建一个模块,该模块将过滤来自输入通道的消息并将它们转换为输出中的其他内容。我知道我可以将它分成两个模块(我更喜欢 Java 代码而不是脚本),例如:

@Filter(inputChannel = "input", outputChannel = "output")
public boolean accept(final Message<?> message) {
    final MyObject payload = (MyObject) message.getPayload();
    return payload.getName().equals("test");
}


@Transformer(inputChannel = "input", outputChannel = "output")
public OtherObject transform(final MyObject data) {
    return convert(data);
}

但我想在单个模块中执行此操作。如果我将过滤逻辑移动到变压器模块并且我在不可接受的有效负载上返回空值,我开始收到 spring-xd 运行时异常。正确的方法是什么?

--编辑--

配置:

@Configuration
@EnableIntegration
public class ModuleConfiguration {

@Bean
public MessageChannel input() {
    return new DirectChannel();
}

@Bean
public MessageChannel output() {
    return new DirectChannel();
}

@Bean
public MessageChannel myChannel() {
    return new DirectChannel();
}

@Bean
public MyFilter filter() {
    return new MyFilter();
}

@Bean
public MyTransformer transformer() {
    return new MyTransformer();
}
}

过滤器:

@Filter(inputChannel = "input", outputChannel = "myChannel")
public boolean accept(final Message<?> message) 

变压器:

@Transformer(inputChannel = "myChannel", outputChannel = "output")
public OtherObject transform(final MyObject payload)

例外:

2016-05-13T11:17:59+0200 1.3.1.RELEASE WARN xdbus.tt.0-1 listener.ConditionalRejectingErrorHandler - Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:865) ~[spring-rabbit-1.5.4.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:760) ~[spring-rabbit-1.5.4.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:680) ~[spring-rabbit-1.5.4.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:93) [spring-rabbit-1.5.4.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:183) ~[spring-rabbit-1.5.4.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1358) [spring-rabbit-1.5.4.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:661) ~[spring-rabbit-1.5.4.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1102) [spring-rabbit-1.5.4.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1086) [spring-rabbit-1.5.4.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1100(SimpleMessageListenerContainer.java:93) [spring-rabbit-1.5.4.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1203) [spring-rabbit-1.5.4.RELEASE.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_73]
Caused by: org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'JavaConfiguredModule [name=myFilter, type=processor, group=tt, index=1 @7d48b140]:default,admin,singlenode,hsqldbServer:9393.input'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:81) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:442) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:392) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.2.4.RELEASE.jar:4.2.4.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.2.4.RELEASE.jar:4.2.4.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.2.4.RELEASE.jar:4.2.4.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:231) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:154) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:102) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:105) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:147) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:120) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:442) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:392) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.2.4.RELEASE.jar:4.2.4.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.2.4.RELEASE.jar:4.2.4.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.2.4.RELEASE.jar:4.2.4.RELEASE]
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:105) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$400(AmqpInboundChannelAdapter.java:45) ~[spring-integration-amqp-4.2.5.RELEASE.jar:na]
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$1.onMessage(AmqpInboundChannelAdapter.java:93) ~[spring-integration-amqp-4.2.5.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:757) ~[spring-rabbit-1.5.4.RELEASE.jar:na]
    ... 10 common frames omitted
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:153) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:120) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
    ... 33 common frames omitted

【问题讨论】:

    标签: java spring spring-integration spring-xd


    【解决方案1】:

    如您所见,转换器必须返回一些东西。

    请参考Spring Integration documentation(每个 XD 处理器模块都是一个带有输入和输出通道的小型 Spring Integration 应用程序 - 源只有一个输出,接收器只有一个输入)。

    按照现在的方式,input 上有两个消费者 - 消息将循环分发给他们。

    您需要将这两个组件连接到一个消息流中(通过第三个消息通道)...

    @Filter(inputChannel = "input", outputChannel = "transformerChannel")
    public boolean accept(final Message<?> message) {
        final MyObject payload = (MyObject) message.getPayload();
        return payload.getName().equals("test");
    }
    
    
    @Transformer(inputChannel = "transformerChannel", outputChannel = "output")
    public OtherObject transform(final MyObject data) {
        return convert(data);
    }
    

    注意通道配置。

    【讨论】:

    • 您好,感谢您的帮助。这些模块以单独的配置部署,我相信消息分配正确,但我将不得不检查。但我的问题的重点是是否可以在单个模块中过滤和转换有效负载?甚至不必在消息总线上放任何东西?谢谢...
    • 这两种方法都可以,在上述场景中,它们通过transformerChannel 在单个模块中直接连接在一起。如果它们已经作为单独的模块进行配置和上传,您可以将它们组合成一个单独的模块,它们将连接在一起,避免总线。请参阅 the docs module compose foo --definition "filter | transfomer" 然后您可以将 foo 用作流定义中的模块。
    • 是的,调用过滤器的总线线程将(如果过滤器通过消息)直接调用转换器;它们使用DirectChannel 连接在一起,不涉及队列。它实际上与我上面建议的操作相同(将两个组件放在一个模块中并将它们与DirectChannel 连接在一起。
    • Dispatcher has no subscribers for channel ... input 所以它是未订阅的过滤器。您是否使用 @MessageEndpoint(或 @Component)注释了包含该方法的类?顺便说一句,这两个方法可以在同一个类中,然后你只需要一个@Bean
    • 酷。仅供参考,在 Spring Integration 4.3(即将推出)中,我们已经删除了对带注释的方法具有类级别注释的要求。
    猜你喜欢
    • 2015-12-08
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-05-25
    相关资源
    最近更新 更多