【问题标题】:Synchronize asynchronous communication同步异步通信
【发布时间】:2017-05-21 08:25:16
【问题描述】:

我有一个 REST 服务,它接收一些数据并通过异步 IBM MQ 请求检查数据。

REST 控制器:

@RestController
@RequestMapping("/request")
public class RequestController {

    @RequestMapping(method = RequestMethod.POST)
    public Response postRequest(@RequestBody Request request) {

        String data = request.getData();

        jmsSender.send(data);

        // Now I need the response from MQ
        // String mqResponse = ...
        if (mqIsValid(mqResponse)) {
            return createValidResponse();
        }
        return createNotValidResponse();
    }
}

MQ 发送者:

@Service
public class JmsSender {

    public void send(String data) {
        jmsTemplate.convertAndSend("QUEUE.TO.MQ", data);
    }

}

MQ 接收器:

@Component
public class JmsReceiver {

    @JmsListener(destination = "QUEUE.FROM.MQ, containerFactory = "DefaultJmsListenerContainerFactory")
    public void receiveMessage(String message) {
        // How to pass the message to the controller?
    }

}

如何等待来自 MQ 的正确数据在控制器中创建正确的响应?

是否可以像here 所述那样使用BlockingQueue?就我而言,我必须区分数据。我不能只从阻塞队列中取出第一个数据。

例如,如果同时有两个 REST 请求(带有数据:abcxyz)。如何确保回复正确的答案,而不仅仅是我从 MQ 得到的第一个答案?

我也无法更改 MQ 接口。

【问题讨论】:

  • 我不知道你是否可以使用 JMS 实现这一点。几个月前我不得不做一些类似的事情,我不得不使用 IBM MQ 类来实现这一点。如果可以使用 MQ 类,则必须设置并使用唯一的correlationId 来匹配响应和请求。
  • 谢谢,我会调查的。实际上,数据本身包含一个唯一的 ID,我可以用它来识别它。但是如何告诉 REST 控制器等待 MQ 的响应并将响应传递给控制器​​?

标签: java rest spring-boot ibm-mq


【解决方案1】:

尝试使用如下所示的 CountDownLatch。

@RestController
@RequestMapping("/request")
public class RequestController {

    @RequestMapping(method = RequestMethod.POST)
    public Response postRequest(@RequestBody Request request) {
        final CountDownLatch jmsLatch = new CountDownLatch (1);

        String data = request.getData();

        jmsSender.send(data, jmsLatch);

        try {
            latch.await();  // wait untill latch counted down to 0
        } catch (InterruptedException e) {
            return createNotValidResponse();
        }

        return createValidResponse();
    }
}

修改send方法,从控制器获取CountDownLatch。

@Service
public class JmsSender {

    public void send(String data, final CountDownLatch jmsLatch) {
        jmsLatch.await();
        jmsTemplate.convertAndSend("QUEUE.TO.MQ", data);
    }

}

修改receive方法,从控制器获取相同的CountDownLatch。

@Component
public class JmsReceiver {

    @JmsListener(destination = "QUEUE.FROM.MQ", containerFactory = "DefaultJmsListenerContainerFactory")
    public void receiveMessage(String message, final CountDownLatch jmsLatch) {
        // Pass the message to the controller
        jmsLatch.countDown();
    }

}

这里的技巧是你必须将相同的 CountDownLatch 实例从控制器传播到发送者和接收者类,并在收到消息后调用 countDown 方法。

【讨论】:

  • 您使用CountDownLatch 有什么原因吗?不能用Object#waitObject#notifyReentrantLock 之类的监视器来实现相同的功能吗?我怎样才能确保从 MQ 得到正确的答复?如果例如两个请求发送到 MQ,第一个 MQ 响应用于第二个请求。
  • 当我遇到类似情况时,我更喜欢CounDownLatch。但是如果我们采用等待/通知方法,您将在哪个对象上使用等待/通知?
  • 对于第二个问题,您必须向我提供有关如何获取值的代码。
【解决方案2】:

由于找不到适合我的解决方案,我创建了一个简单的等待机制来获取数据。

MqReceiver:

@Component
public class JmsReceiver {

    private final Lock lock;
    private final Condition containsKey;
    private final Map<String, String> responses;

    public JmsReceiver() {
        this.lock = new ReentrantLock();
        this.containsKey = lock.newCondition();
        this.responses = new HashMap<>();
    }

    @JmsListener(destination = "QUEUE.FROM.MQ", containerFactory = "DefaultJmsListenerContainerFactory")
    public void receiveMessage(String message) {
        put(getKeyFromMessage(message), message);
    }

    public String get(String key) throws InterruptedException {
        lock.lock();
        try {
            while (!responses.containsKey(key)) {
                containsKey.await();
            }
            return responses.get(key);
        } finally {
            lock.unlock();
        }
    }

    public void put(String key, String messagee) {
        lock.lock();
        try {
            responses.put(key, messagee);
            containsKey.signalAll();
        } finally {
            lock.unlock();
        }
    }

}

这可以在控制器中使用:

@RestController
@RequestMapping("/request")
public class RequestController {

    @RequestMapping(method = RequestMethod.POST)
    public Response postRequest(@RequestBody Request request) {

        String data = request.getData();

        jmsSender.send(data);

        String key = getKeyFromData(data);
        // waits until MQ sends the data
        String mqResponse = jmsReceiver.get(key);

        if (mqIsValid(mqResponse)) {
            return createValidResponse();
        }
        return createNotValidResponse();
    }
}

【讨论】:

    【解决方案3】:

    使用 jms(activemq) 实现请求-回复模式的场景同步异步解决方案

    此示例的想法是在不同 jvm 中的两个不同服务中工作。该解决方案与多个实例服务同时进行了测试:

    • Service 1 (M1) - Rest api 同步并在某些时候启动 异步流使用 activemq 调用第二个 Service M2 实现集成模式Request-Reply。您不需要停止或等待任何线程,jms 模式实现了 ack Session.AUTO_ACKNOWLEDGE

      @PostMapping
      public AnyDto sendMessage(final AnyDto anyDto) {
          return routeService.send(anyDto);
      }
      public void flowOrchestation (final anyDto data) throws JMSException {
          final ObjectMessage objectMessage = composeTemplateMessage(data);
          final AnyDto responseDto = jmsMessagingTemplate.convertSendAndReceive(new ActiveMQQueue("queue.request"),
                  objectMessage, AnyDto.class);
      }
      private ObjectMessage composeTemplateMessage(final AnyDto data) throws JMSException {
      
          jmsTemplate.setReceiveTimeout(10000L);
          jmsMessagingTemplate.setJmsTemplate(jmsTemplate);
      
          Session session = jmsMessagingTemplate.getConnectionFactory().createConnection()
                  .createSession(false, Session.AUTO_ACKNOWLEDGE);
      
          final ObjectMessage objectMessage = session.createObjectMessage(data);
      
          objectMessage.setJMSCorrelationID(UUID.randomUUID().toString());
          objectMessage.setJMSReplyTo(new ActiveMQQueue("queue.response"));
          objectMessage.setJMSExpiration(0);
          objectMessage.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
          return objectMessage;
      }
      

    超时和过期可以根据您的要求进行修改。 0过期意味着没有时间过期。

    • Service 2 (M2):只要接收到消息,回复 M1 上设置的 JmsReplyTo。

    @Component public class Consumer implements SessionAwareMessageListener<Message> { @Override @JmsListener(destination = "${queue.request}") public void onMessage(Message message, Session session) throws JMSException { AnyDto anyDto = (AnyDto) ((ActiveMQObjectMessage) message).getObject(); //do some stuff final ObjectMessage responseMessage = new ActiveMQObjectMessage(); responseMessage.setJMSCorrelationID(message.getJMSCorrelationID()); responseMessage.setObject(dtoModified); final MessageProducer producer = session.createProducer(message.getJMSReplyTo()); producer.send(responseMessage); }}

    【讨论】:

      猜你喜欢
      • 2014-12-20
      • 2019-01-14
      • 2015-08-03
      • 1970-01-01
      • 2015-04-11
      • 2017-02-24
      • 2012-04-23
      • 2020-03-25
      • 1970-01-01
      相关资源
      最近更新 更多