【问题标题】:How to wrap a JMS to WebSphere MQ bridge in a synchronous call using the request-reply pattern?如何使用请求-回复模式在同步调用中将 JMS 包装到 WebSphere MQ 桥接器?
【发布时间】:2011-05-23 02:12:09
【问题描述】:

我只是在为我处理一个新场景,我相信这可能对某些人来说很常见:)..

根据要求,我需要构建一种用户体验,就像 Web 服务调用的同步在线事务一样,它实际上使用异步 JMS-MQ 桥将调用委托给 IBM MQ 系列。

客户端调用 Web 服务,然后他的消息应该发布到应用服务器上的 JMS 队列中,该队列将被传递到 WebSphere MQ,并且在处理完响应后,将在 FIXED JMS 队列端点中将响应传递回应用服务器。

如果 WebSphere MQ 未在定义的时间内交付响应,则该要求处理需要超时的事务,而 Web 服务应向客户端发送超时信号并忽略此事务。

问题的草图如下。

我需要阻止网络服务上的请求,直到响应到达或超时。

我正在寻找一些开放的图书馆来帮助我完成这项任务。 或者唯一的解决方案是阻塞一个线程并继续池化响应? 也许我可以用一个监听器实现一些块,以便在响应到达时得到通知?

现在进行一些讨论对我尝试澄清我的想法非常有帮助。 有什么建议吗?

我有一个草图,希望能帮助清除图片;)

【问题讨论】:

    标签: asynchronous jms weblogic integration ibm-mq


    【解决方案1】:

    您好,感谢您发布自己的解决方案!

    是的,在这种情况下,接收超时是最优雅的方法。

    注意由于超时而未读取的消息会发生什么情况。如果您的客户再次访问同一个队列,他可能会收到一条陈旧的消息。

    确保及时删除超时的消息(如果没有其他原因,则不要将未处理的消息填满队列)。

    您可以通过代码(在消息生产者上设置生存时间)或在 Websphere MQ 服务器上(使用使消息自动过期的队列)轻松地做到这一点。

    如果您不能/不想修改代码的 MQ 端,后者会更容易。这就是我会做的:)

    【讨论】:

    • 是的,我们有一个错误队列,超时消息在其中转发,另一段代码处理它们。
    【解决方案2】:

    经过几天的编码,我找到了解决方案。我正在使用带有 JAX-WS 注释和标准 JMS 的标准 EJB3。

    到目前为止,我为满足要求而编写的代码如下。它是一个带有 bean 管理事务(BMT)的无状态会话 Bean,因为使用标准容器管理事务(CMT)会导致某种挂起,我相信是因为我试图将两个 JMS 交互放在同一个事务中,因为它们在同样的方法,所以请注意我必须为与 JMS 队列的每次交互启动和完成事务。我正在为这个解决方案使用 weblogic。而且我还编写了一个 MDB,它基本上使用来自队列端点 jms/Pergunta 的消息,并将响应消息放在 jms/Resposta 队列上,我这样做是为了模拟这个问题的 MQ 端的预期行为。实际上,在实际场景中,我们可能会在大型机上使用一些 COBOL 应用程序,甚至是其他 java 应用程序来处理消息并将响应放入响应队列中。

    如果有人需要尝试这段代码,基本上你只需要一个容器 J2EE5 并使用 jndi 名称配置 2 个队列:jms/Pergunta 和 jms/Resposta。

    EJB/Webservice 代码:

    @Stateless
    @TransactionManagement(TransactionManagementType.BEAN)
    @WebService(name="DJOWebService")
    public class DJOSessionBeanWS implements DJOSessionBeanWSLocal {
    
        Logger log = Logger.getLogger(DJOSessionBeanWS.class.getName());
    
        @Resource
        SessionContext ejbContext;
    
        // Defines the JMS connection factory.
        public final static String JMS_FACTORY = "weblogic.jms.ConnectionFactory";
    
        // Defines request queue
        public final static String QUEUE_PERG = "jms/Pergunta";
    
        // Defines response queue
        public final static String QUEUE_RESP = "jms/Resposta";
    
    
        Context ctx;
        QueueConnectionFactory qconFactory;
    
        /**
         * Default constructor. 
         */
        public DJOSessionBeanWS() {
            log.info("Construtor DJOSessionBeanWS");
        }
    
        @WebMethod(operationName = "processaMensagem")
        public String processaMensagem(String mensagemEntrada, String idUnica)
        {
            //gets UserTransaction reference as this is a BMT EJB.
            UserTransaction ut = ejbContext.getUserTransaction();
            try {
    
                ctx = new InitialContext();
                //get the factory before any transaction it is a weblogic resource.
                qconFactory = (QueueConnectionFactory) ctx.lookup(JMS_FACTORY);
                log.info("Got QueueConnectionFactory");
                ut.begin();
                QueueConnection qcon = qconFactory.createQueueConnection();
                QueueSession qsession = qcon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
                Queue qs = (Queue) (new InitialContext().lookup("jms/Pergunta"));
                TextMessage message = qsession.createTextMessage("this is a request message");
                message.setJMSCorrelationID(idUnica);
                qsession.createSender(qs).send(message);
                ut.commit();
                qcon.close();
                //had to finish and start a new transaction, I decided also get new references for all JMS related objects, not sure if this is REALLY required
                ut.begin();
                QueueConnection queuecon = qconFactory.createQueueConnection();
                Queue qreceive = (Queue) (new InitialContext().lookup("jms/Resposta"));
                QueueSession queuesession = queuecon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
                String messageSelector = "JMSCorrelationID = '" + idUnica + "'";
                //creates que receiver and sets a message selector to get only related message from the response queue.
                        QueueReceiver qr = queuesession.createReceiver(qreceive, messageSelector);
                queuecon.start();
                //sets the timeout to keep waiting for the response...
                TextMessage tresposta = (TextMessage) qr.receive(10000);
                if(tresposta != null)
                {
                    ut.commit();
                    queuecon.close();
                    return(tresposta.toString());
                }
                else{
                    //commints anyway.. does not have a response though 
                    ut.commit();
                    queuecon.close();
                    log.info("null reply, returned by timeout..");
                    return "Got no reponse message.";
                }
    
    
    
            } catch (Exception e) {
                log.severe("Unexpected error occurred ==>> " + e.getMessage());
                e.printStackTrace();
                try {
                    ut.commit();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
                return "Error committing transaction after some other error executing ==> " + e.getMessage();
            } 
    
        }
    }   
    

    这是 MDB 的代码,它模拟了这个问题的 MQ 方面。我在测试期间有一个 Thread.sleep 片段来模拟和测试客户端的超时以验证解决方案,但此版本中不存在。

    /**
     * Mock to get message from request queue and publish a new one on the response queue.
     */
    @MessageDriven(
            activationConfig = { @ActivationConfigProperty(
                    propertyName = "destinationType", propertyValue = "javax.jms.Queue"
            ) }, 
            mappedName = "jms/Pergunta")
    public class ConsomePerguntaPublicaRespostaMDB implements MessageListener {
    
        Logger log = Logger.getLogger(ConsomePerguntaPublicaRespostaMDB.class.getName());
    
        // Defines the JMS connection factory.
        public final static String JMS_FACTORY = "weblogic.jms.ConnectionFactory";
    
        // Define Queue de resposta
        public final static String QUEUE_RESP = "jms/Resposta";
    
    
        Context ctx;
        QueueConnectionFactory qconFactory;
    
    
    
        /**
         * Default constructor. 
         */
        public ConsomePerguntaPublicaRespostaMDB() {
            log.info("Executou construtor ConsomePerguntaPublicaRespostaMDB");
            try {
                ctx = new InitialContext();
            } catch (NamingException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * @see MessageListener#onMessage(Message)
         */
        public void onMessage(Message message) {
            log.info("Recuperou mensagem da fila jms/FilaPergunta, executando ConsomePerguntaPublicaRespostaMDB.onMessage");
            TextMessage tm = (TextMessage) message;
    
            try {
                log.info("Mensagem recebida no onMessage ==>> " + tm.getText());
    
                //pega id da mensagem na fila de pergunta para setar corretamente na fila de resposta.
                 String idMensagem = tm.getJMSCorrelationID();
                 log.info("Id de mensagem que sera usada na resposta ==>> " + idMensagem);
    
                qconFactory = (QueueConnectionFactory) ctx.lookup(JMS_FACTORY);
                log.info("Inicializou contexto jndi e deu lookup na QueueConnectionFactory do weblogic com sucesso. Enviando mensagem");
                QueueConnection qcon = qconFactory.createQueueConnection();
                QueueSession qsession = qcon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
                Queue queue = (Queue) (ctx.lookup("jms/Resposta"));
                TextMessage tmessage = qsession.createTextMessage("Mensagem jms para postar na fila de resposta...");
                tmessage.setJMSCorrelationID(idMensagem);
                qsession.createSender(queue).send(tmessage);
            } catch (JMSException e) {
                log.severe("Erro no onMessage ==>> " + e.getMessage());
                e.printStackTrace();
            }  catch (NamingException e) {
                log.severe("Erro no lookup ==>> " + e.getMessage());
                e.printStackTrace();
            }
    
        }
    
    }
    

    []s

    【讨论】:

      猜你喜欢
      • 2012-07-30
      • 1970-01-01
      • 2010-11-14
      • 1970-01-01
      • 2010-11-24
      • 2010-10-28
      • 2014-06-14
      • 2012-03-10
      • 2015-07-08
      相关资源
      最近更新 更多