【发布时间】:2018-05-10 11:25:10
【问题描述】:
我有一个这样定义的 Spring 集成流程:
IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "queueName")
.id("id")
.acknowledgeMode(MANUAL)
.autoStartup(autoStartup)
.concurrentConsumers(2)
.maxConcurrentConsumers(3)
.messageConverter(messageConverter()))
.aggregate(a -> ...)
.handle(serviceActivatorBean, "myMethod", e -> e.advice(requestHandlerRetryAdviceForIntegrationFlow()))
.get();
还有一个这样定义的服务激活器:
@Component
@Transactional
public class ServiceActivator {
@ServiceActivator
public void myMethod(List<> Collection<MyEvent> events) {
....
}
}
我要做的是更改myMethod 以获取与聚合中的每条消息关联的特定标头列表。在我的情况下,我想为每条消息保留AmqpHeaders.CHANNEL 和AmqpHeaders.DELIVERY_TAG,以便我可以对发送给 RabbitMQ 的每条消息执行 ACK 或 NACK(请注意,我在IntegrationFlow,因为我想在myMethod 执行之后发送ACK/NACK)。
例如,我尝试过这种方法:
@Component
@Transactional
public class ServiceActivator {
@ServiceActivator
public void myMethod(@Header(AmqpHeaders.CHANNEL) List<Channel> channels,
@Header(AmqpHeaders.DELIVERY_TAG) List<Long> tags,
Collection<MyEvent> events) {
....
}
}
但在这里我似乎只获得了最后一条消息的标头值(即 channels 和 tags 的大小始终为 1,即使 events 集合中有多个事件)。
我还尝试将 Collection<MyEvent> 更改为 Collection<Message> (org.springframework.messaging.Message) 以尝试手动提取标头,但这失败了:
org.springframework.messaging.MessageHandlingException: nested exception is java.lang.ClassCastException: com.x.y.MyEvent cannot be cast to org.springframework.messaging.Message
因为消息已经被 IntegrationFlow 中定义的消息转换器转换。
我怎样才能做到这一点?
【问题讨论】:
标签: java spring spring-integration spring-integration-amqp