【问题标题】:Spring Integration and RabbitMQ: no output-channel or replyChannel header available while receiving messageSpring Integration 和 RabbitMQ:接收消息时没有可用的 output-channel 或 replyChannel 标头
【发布时间】:2026-02-06 22:40:01
【问题描述】:

我编写了一个包含请求和回复的简单消息流。我必须使用两个独立的队列,所以我声明 AmqpOutboundAdapter 发送消息并声明 AmqpInboundAdapter 接收回复。

@Bean
@FindADUsers
public AmqpOutboundEndpoint newFindADUsersOutboundAdapter() {
    return Amqp.outboundAdapter(amqpTemplate())
            .routingKeyExpression("headers[" + ADUsersFindConfig.ROUTING_KEY_HEADER + "]")
            .exchangeName(getExchange())
            .headerMapper(amqpHeaderMapper())
            .get();
}

@Bean
public AmqpInboundChannelAdapter newFindADUsersResponseInboundChannelAdapter(
        ADUsersFindResponseConfig config) {
    return Amqp.inboundAdapter(rabbitConnectionFactory(), findADUsersResponseQueue)
            .headerMapper(amqpHeaderMapper())
            .outputChannel(config.newADUsersFindResponseOutputChannel())
            .get();
}

它应该与@MessagingGateway 一起使用:

@MessagingGateway
public interface ADUsersFindService {

     String FIND_AD_USERS_CHANNEL = "adUsersFindChannel";

     String FIND_AD_USERS_REPLY_OUTPUT_CHANNEL = "adUsersFindReplyOutputChannel";

     String FIND_AD_USERS_REPLY_CHANNEL = "adUsersFindReplyChannel";

     String CORRELATION_ID_REQUEST_HEADER = "correlation_id";

     String ROUTING_KEY_HEADER = "replyRoutingKey";

     String OBJECT_TYPE_HEADER = "object.type";

     @Gateway(requestChannel = FIND_AD_USERS_CHANNEL, replyChannel = FIND_AD_USERS_REPLY_CHANNEL)
ADResponse find(ADRequest adRequest, @Header(ROUTING_KEY_HEADER) String routingKey, @Header(OBJECT_TYPE_HEADER) String objectType);
}

ADUsersFindResponseConfig 类如下所示:

 @Configuration
 @Import(JsonConfig.class)
 public class ADUsersFindResponseConfig {

     @Autowired
     public NullChannel nullChannel;

     @Autowired
     private JsonObjectMapper<?, ?> mapper;

     /**
      * @return The output channel for the flow
      */
     @Bean(name = ADUsersFindService.FIND_AD_USERS_REPLY_OUTPUT_CHANNEL)
     public MessageChannel newADUsersFindResponseOutputChannel() {
         return MessageChannels.direct().get();
     }

     /**
      * @return The output channel for gateway
      */
     @Bean(name = ADUsersFindService.FIND_AD_USERS_REPLY_CHANNEL)
     public MessageChannel newADUsersFindResponseChannel() {
         return MessageChannels.direct().get();
     }

     @Bean
     public IntegrationFlow findADUsersResponseFlow() {
         return IntegrationFlows
                 .from(newADUsersFindResponseOutputChannel())
                 .transform(new JsonToObjectTransformer(ADResponse.class, mapper))
                 .channel(newADUsersFindResponseChannel())
                 .get();
     }
 }

发送消息正常,但接收消息时出现问题。我期望收到的消息将传递到名为 FIND_AD_USERS_REPLY_OUTPUT_CHANNEL 的通道,然后使用 findADUsersResponseFlow 将消息反序列化为 ADResponse 对象,然后将下一个 ADResponse 对象传递给网关 replyChannel - FIND_AD_USERS_REPLY_CHANNEL。最后,'find' 方法返回这个对象。不幸的是,当 org.springframework.integration.handler.BridgeHandler 收到一条消息时,我得到了异常:

 org.springframework.messaging.MessagingException: ; nested exception is org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available

消息日志如下:

 11:51:35.697 [SimpleAsyncTaskExecutor-1] INFO  New message - GenericMessage [payload={...somepayload...}, headers={correlation_id=7cbd958e-4b09-4e4c-ba8e-5ba574f3309a, replyRoutingKey=findADUsersResponse.ad, amqp_consumerQueue=findADUsersResponseQueue, history=newFindADUsersResponseInboundChannelAdapter,adUsersFindReplyOutputChannel,adUsersFindReplyChannel,infoLog,infoLoggerChain.channel#0,infoLoggerChain.channel#1, id=37a4735d-6983-d1ad-e0a1-b37dc17e48ef, amqp_consumerTag=amq.ctag-8Qs5YEun1jXYRf85Hu1URA, object.type=USER, timestamp=1469094695697}]

所以我很确定该消息已传递给 adUsersFindReplyChannel。此外(如果重要的话)请求消息和回复消息都将“replyTo”标头设置为空。我做错了什么?

【问题讨论】:

    标签: java spring rabbitmq spring-integration amqp


    【解决方案1】:

    replyChannel 标头是活动对象,不能通过 AMQP 序列化。

    您可以使用出站网关而不是一对适配器,框架将处理标头。

    如果你因为某种原因必须使用适配器,你需要做两件事:

    1. 使用header channel registry 将频道对象转换为在注册表中注册的字符串。

    2. 确保标头映射器配置为发送/接收 replyChannel 标头,并且您的接收系统在回复中返回标头。

    【讨论】:

    • 为什么要在标头中发送转换后的replyChannel?我想将两个适配器视为两个独立的数据流,在我的@Gateway 中,我想将数据传递给 OutboundAdapter(通过FIND_AD_USERS_CHANNEL 通道)并从 InboundAdapter 获取结果(通过 FIND_AD_USERS_REPLY_CHANNELfindADUsersResponseFlow 将 json 转换为响应对象)。我绝对不想将两个数据流捆绑在一起。
    • 这就是消息传递网关的工作方式——它在请求中放置一个回复通道标头,当回复发送到网关的回复通道时,它使用标头将回复路由到同一个线程。如果您丢失了标头,网关将无法知道哪个线程正在等待回复(回复通道桥接到请求的 replyChannel 标头,这是调用线程正在等待接收消息的地方。
    最近更新 更多