【发布时间】:2020-02-06 14:33:45
【问题描述】:
我正在尝试使用 Spring Webflux 和 Spring Integration 从 JMS 队列创建反应流 (Flux)。
我正在尝试从 JMS 队列(使用 Spring 集成的 IBM MQ)创建反应流(Spring Webflux),以便客户端异步获取 JMS 消息。我相信当消息被反应式侦听器消耗时,我已经正确连接了所有内容。但是,我的反应通量流无法显示这些消息。 任何帮助将不胜感激。
这是我用来使我的 JMS 侦听器反应的代码:
UM 网关
@Named
@Slf4j
public class UmGateway {
@Autowired
private ConnectionFactory connectionFactory;
@Autowired
private JmsTemplate jmsTemplate;
@Value("${um.mq.queueName}")
private String queueName;
@Bean
public Publisher<Message<MilestoneNotification>> jmsReactiveSource() {
return IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(this.connectionFactory)
.destination(queueName))
.channel(MessageChannels.queue())
.log(Level.DEBUG)
.log()
.toReactivePublisher();
}
public Flux<MilestoneNotification> findAll() {
return Flux.from(jmsReactiveSource())
.map(Message::getPayload);
}
/**
* Method which sends Milestone Notifications to the UM Queue.
*/
public void send(final MilestoneNotification message) {
jmsTemplate.convertAndSend(queueName, message);
}
}
控制器
@RestController
@Slf4j
@RequiredArgsConstructor
@RequestMapping(ApiConstants.MILESTONE_UM)
public class MilestoneUmController {
@Autowired
private UmGateway umGateway;
@RequestMapping(value = "/message", method = RequestMethod.POST)
public ResponseEntity<Boolean> sendMessage(
final @RequestBody MilestoneNotification notification) {
umGateway.send(notification);
return new ResponseEntity<>(HttpStatus.OK);
}
@GetMapping(path = "/milestone-notification/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<MilestoneNotification> feed() {
return this.umGateway.findAll();
}
}
这里是日志:
- 2020.02.06 13:53:04.900 [jmsReactiveSource.org.springframework.jms.listener.DefaultMessageListenerContainer#0-1] INFO o.s.i.h.LoggingHandler - GenericMessage [payload={"messageId":"MAHESH_007","NotificationTag":"MAHESH_007","messageTimeStamp":"2020-01-21T10:56:33Z","processMilestoneId":"MAHESH_007","processMilestoneName":"MAHESH_007","accountNumber":"12345","previousStatus":"In Progress","currentStatus":"complete","isNew":true}, headers={JMS_IBM_Character_Set=UTF-8, JMS_IBM_MsgType=8, jms_destination=queue:///NOTIFICATIONQUEUE, _type=com.jpmc.wss.portal.domain.um.MilestoneNotification, JMSXUserID=cibcfdid , JMS_IBM_Encoding=273, priority=4, jms_timestamp=1580997184878, JMSXAppID=jpmc.wss.portal.Application , JMS_IBM_PutApplType=28, JMS_IBM_Format=MQSTR , jms_redelivered=false, JMS_IBM_PutDate=20200206, JMSXDeliveryCount=1, JMS_IBM_PutTime=13530511, id=5d277be2-49f5-3e5d-8916-5793db3b76e7, jms_messageId=ID:414d51204e41544d31383820202020201d9f3b5e03738521, timestamp=1580997184900}]
- 2020.02.06 13:53:04.968 [qtp2132762784-23] DEBUG c.j.w.p.u.RequestLoggingInterceptor - Returning status code 200 for POST request to /common/dataservice/di/milestone/um/message with query=[null] and http-user=[null]
- 2020.02.06 13:53:53.521 [qtp2132762784-18] INFO c.j.w.p.u.RequestLoggingInterceptor - Received GET request to /common/dataservice/di/milestone/um/milestone-notification/stream with query=[null] and http-user=[null]
- 2020.02.06 13:54:09.070 [qtp2132762784-16] INFO c.j.w.p.u.RequestLoggingInterceptor - Received POST request to /common/dataservice/di/milestone/um/message with query=[null] and http-user=[null]
- 2020.02.06 13:54:09.541 [jmsReactiveSource.org.springframework.jms.listener.DefaultMessageListenerContainer#0-1] INFO o.s.i.h.LoggingHandler - GenericMessage [payload={"messageId":"MAHESH_007","diNotificationTag":"MAHESH_007","messageTimeStamp":"2020-01-21T10:56:33Z","processMilestoneId":"MAHESH_007","processMilestoneName":"MAHESH_007","accountNumber":"12345","previousStatus":"In Progress","currentStatus":"complete","isNew":true}, headers={JMS_IBM_Character_Set=UTF-8, JMS_IBM_MsgType=8, jms_destination=queue:///NOTIFICATIONQUEUE, _type=com.jpmc.wss.portal.domain.um.MilestoneNotification, JMSXUserID=cibcfdid , JMS_IBM_Encoding=273, priority=4, jms_timestamp=1580997249519, JMSXAppID=jpmc.wss.portal.Application , JMS_IBM_PutApplType=28, JMS_IBM_Format=MQSTR , jms_redelivered=false, JMS_IBM_PutDate=20200206, JMSXDeliveryCount=1, JMS_IBM_PutTime=13540975, id=5421898e-5ef6-1f9b-aaa6-81ebc7668f50, jms_messageId=ID:414d51204e41544d31383820202020201d9f3b5e08738521, timestamp=1580997249541}]
- 2020.02.06 13:54:09.593 [qtp2132762784-16] DEBUG c.j.w.p.u.RequestLoggingInterceptor - Returning status code 200 for POST request to /common/dataservice/di/milestone/um/message with query=[null] and http-user=[null]
【问题讨论】:
-
考虑使用
Flux<String>而不是MilestoneNotificationPOJO。 -
感谢@ArtemBilan 的回复。我尝试了您建议的方式,但我的通量流仍然没有得到任何响应。可能是因为我的通量流在另一个线程上,而 JMS 侦听器在另一个线程上? :Flux 流的线程:JMS 侦听器的 qtp2132762784-18 线程:jmsReactiveSource.org.springframework.jms.listener.DefaultMessageListenerContainer#0-1 ?
标签: spring spring-boot spring-integration reactive-programming project-reactor