【问题标题】:DestinationResolutionException: no output-channel or replyChannel header availableDestinationResolutionException:没有可用的输出通道或回复通道标头
【发布时间】:2017-07-23 17:46:10
【问题描述】:

我正在实现一个流程,其中使用 MongoDbMessageSource 我得到了一个用户列表,我想并行处理每个文档。为此,我使用 Split 的默认行为。

但是拆分后出现如下错误:

o.s.i.channel.PublishSubscribeChannel    : preSend on channel 'errorChannel', message: ErrorMessage [payload=org.springframework.messaging.MessagingException: Dispatcher failed to deliver Message; nested exception is org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available, failedMessage=GenericMessage [payload=UserEntity{id=5974dfe53681ac160c78dc0f, firstName=David, lastName=García, age=14, socialMedia=[]}, headers={sequenceNumber=4, correlationId=8f8f7b7a-832a-8942-1922-26b6a7529091, id=bb373e42-d59c-42e6-d221-68bf1f56fec3, mongo_collectionName=users, sequenceSize=5, timestamp=1500831727759}], headers={id=9187ffcd-8c79-eb1e-8791-1cfe558ab134, timestamp=1500831727762}]

代码如下:

@Configuration
@IntegrationComponentScan
public class InfrastructureConfiguration {

    private static Logger logger = LoggerFactory.getLogger(InfrastructureConfiguration.class);

    /**
     * The Pollers builder factory can be used to configure common bean definitions or 
     * those created from IntegrationFlowBuilder EIP-methods
     */
    @Bean(name = PollerMetadata.DEFAULT_POLLER)
    public PollerMetadata poller() {
        return Pollers.fixedDelay(10, TimeUnit.SECONDS).get();
    }

    @Bean
    public TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(10);
        executor.setQueueCapacity(25);
        return executor;
    }

    /**
     * 
     * MongoDbMessageSource is an instance of MessageSource which returns a Message with a payload 
     * which is the result of execution of a Query
     */
    @Bean
    @Autowired
    public MessageSource<Object> mongoMessageSource(MongoDbFactory mongo) {
        MongoDbMessageSource messageSource = new MongoDbMessageSource(mongo, new LiteralExpression("{}"));
        messageSource.setExpectSingleResult(false);
        messageSource.setEntityClass(UserEntity.class);
        messageSource.setCollectionNameExpression(new LiteralExpression("users"));
        return messageSource;
    }

    @Bean
    public DirectChannel inputChannel() {
        return new DirectChannel();
    }


    @Bean
    @Autowired
    public IntegrationFlow processUsers(MongoDbFactory mongo, PollerMetadata poller) {
        return IntegrationFlows.from(mongoMessageSource(mongo), c -> c.poller(poller))
                .split()
                .channel(MessageChannels.executor("executorChannel", this.taskExecutor()))
                .handle((GenericHandler<UserEntity>) (payload, headers) -> {
                    logger.debug("user:" + payload + " on thread "
                        + Thread.currentThread().getName());
                    return payload;
                })
                .aggregate()
                .get();
    }

}

有人知道我做错了吗?提前致谢。

按照 Barath 的建议使用 MessageHandler:

@Bean
    @Autowired
    public IntegrationFlow processUsers(MongoDbFactory mongo, PollerMetadata poller) {
        return IntegrationFlows.from(mongoMessageSource(mongo), c -> c.poller(poller))
                .split()
                .channel(MessageChannels.executor("executorChannel", this.taskExecutor()))
                .wireTap(sf -> sf.handle(user -> logger.debug("user:" + user.getPayload().toString() + " on thread " + Thread.currentThread().getName())))
                .aggregate()
                .get();
    }

错误仍然存​​在,我在下面放的错误的完整痕迹:

2017-07-23 21:46:36.785 DEBUG 15148 --- [ taskExecutor-4] o.s.integration.handler.LoggingHandler   : _org.springframework.integration.errorLogger.handler received message: ErrorMessage [payload=org.springframework.messaging.MessagingException: Dispatcher failed to deliver Message; nested exception is org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available, failedMessage=GenericMessage [payload=UserEntity{id=5974fd123681ac3b2c5c343a, firstName=David, lastName=García, age=14, socialMedia=[]}, headers={sequenceNumber=4, correlationId=da7be297-992b-8f5a-d41c-58a89e654fcc, id=eb55d6bf-8108-4be5-8b32-6260d4ceea9b, mongo_collectionName=users, sequenceSize=5, timestamp=1500839196770}], headers={id=83a833cd-ac87-f6be-fb75-8d6907e3d194, timestamp=1500839196784}]
2017-07-23 21:46:36.788 ERROR 15148 --- [ taskExecutor-4] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessagingException: Dispatcher failed to deliver Message; nested exception is org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available, failedMessage=GenericMessage [payload=UserEntity{id=5974fd123681ac3b2c5c343a, firstName=David, lastName=García, age=14, socialMedia=[]}, headers={sequenceNumber=4, correlationId=da7be297-992b-8f5a-d41c-58a89e654fcc, id=eb55d6bf-8108-4be5-8b32-6260d4ceea9b, mongo_collectionName=users, sequenceSize=5, timestamp=1500839196770}]
    at org.springframework.integration.dispatcher.AbstractDispatcher.wrapExceptionIfNecessary(AbstractDispatcher.java:133)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:120)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.access$000(UnicastingDispatcher.java:53)
    at org.springframework.integration.dispatcher.UnicastingDispatcher$3.run(UnicastingDispatcher.java:129)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:55)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:353)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:269)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:186)
    at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.completeGroup(AbstractCorrelatingMessageHandler.java:671)
    at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.handleMessageInternal(AbstractCorrelatingMessageHandler.java:418)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    ... 7 more

2017-07-23 21:46:36.788 DEBUG 15148 --- [ taskExecutor-4] o.s.i.channel.PublishSubscribeChannel    : postSend (sent=true) on channel 'errorChannel', message: ErrorMessage [payload=org.springframework.messaging.MessagingException: Dispatcher failed to deliver Message; nested exception is org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available, failedMessage=GenericMessage [payload=UserEntity{id=5974fd123681ac3b2c5c343a, firstName=David, lastName=García, age=14, socialMedia=[]}, headers={sequenceNumber=4, correlationId=da7be297-992b-8f5a-d41c-58a89e654fcc, id=eb55d6bf-8108-4be5-8b32-6260d4ceea9b, mongo_collectionName=users, sequenceSize=5, timestamp=1500839196770}], headers={id=83a833cd-ac87-f6be-fb75-8d6907e3d194, timestamp=1500839196784}]

【问题讨论】:

  • 您是否尝试过 MessageHandler 而不是 GenericHandler 将其作为 Message 处理。我在想频道的输出是消息而不是用户实体?如果我错了,请纠正我!。
  • 感谢您的回复,我已根据您的建议修改了答案。但错误仍然存​​在。

标签: spring spring-boot spring-integration


【解决方案1】:

Aggregator 的问题是请求-回复组件,但在此之后您的流程中没有任何内容。这就是为什么你有这个错误。您必须决定如何处理聚合器结果。

【讨论】:

    猜你喜欢
    • 2023-04-02
    • 1970-01-01
    • 2022-01-17
    • 2018-04-10
    • 2018-06-12
    • 1970-01-01
    • 1970-01-01
    • 2011-02-05
    • 2014-11-19
    相关资源
    最近更新 更多