【问题标题】:Testing JMS and Spring Integration测试 JMS 和 Spring 集成
【发布时间】:2015-05-19 01:10:47
【问题描述】:

我正在尝试编写一个测试类,以测试侦听 JMS 队列的消息驱动通道适配器是否将消息转发到正确的通道(参考 Advanced Spring Integration Testing)。以下是测试上下文 xml:

<!--  MockRunner configuration  -->
    <bean id="destinationManager" class="com.mockrunner.jms.DestinationManager"/>

    <bean id="outgoingDestination" factory-bean="destinationManager" factory-method="createQueue">
        <constructor-arg index="0" value="demoMockRunnerQueue"/>
    </bean>

    <bean id="configurationManager" class="com.mockrunner.jms.ConfigurationManager"/>

    <bean id="connectionFactory" class="com.mockrunner.mock.jms.MockQueueConnectionFactory">
        <constructor-arg index="0" ref="destinationManager"/>
        <constructor-arg index="1" ref="configurationManager"/>
    </bean>

    <!--  Spring JMS Template -->
    <bean id="jmsTemplate" class="org.mockito.Mockito" factory-method="mock">
        <constructor-arg value="org.springframework.jms.core.JmsTemplate" />
    </bean>

这里是带有消息驱动通道的spring集成配置:

<int:channel id="inbound"/>

<int-jms:message-driven-channel-adapter id="jmsIn"
                                            channel="inbound"
                                            destination="outgoingDestination"
                                            connection-factory="connectionFactory"
                                            acknowledge="transacted"/>

<int:service-activator input-channel="inbound"
                            ref="messageQueueConsumer"
                            method="consumeMessage"/>

<bean id="messageQueueConsumer" class="uk.co.example.consumer.SimpleMessageConsumer">
    </bean>

下面是包含测试的 java 类:

@Resource
JmsTemplate jmsTemplate;

/**
 * "inbound" is the channel used to trigger the service activator (i.e. the message consumer)
 * */
@Resource
@Qualifier("inbound")
SubscribableChannel inbound;

private static final Logger LOGGER = Logger.getLogger(InboundChannelFlowUnitTest.class);

/**
 * This test verifies that a message received on a polling JMS inbound channel adapter is
 * routed to the designated channel and that the message payload is as expected
 *
 * @throws JMSException
 * @throws InterruptedException
 * @throws IOException
 */
@Test
public void testReceiveMessage() throws JMSException, InterruptedException, IOException {
    String msg = "hello";

    boolean sent = verifyJmsMessageReceivedOnChannel(msg, inbound, new CountDownHandler() {

                @Override
                protected void verifyMessage(Message<?> message) {
                    assertEquals("hello", message.getPayload());
                }
            }
    );
    assertTrue("message not sent to expected output channel", sent);
}

/**
 * Provide a message via a mock JMS template and wait for the default timeout to receive the message on the expected channel
 * @param obj The message provided to the poller (currently must be a String)
 * @param expectedOutputChannel The expected output channel
 * @param handler An instance of CountDownHandler to handle (verify) the output message
 * @return true if the message was received on the expected channel
 * @throws JMSException
 * @throws InterruptedException
 */
protected boolean verifyJmsMessageReceivedOnChannel(Object obj, SubscribableChannel expectedOutputChannel, CountDownHandler handler) throws JMSException, InterruptedException{
    return verifyJmsMessageOnOutputChannel(obj, expectedOutputChannel, handler, 2000);
}

/**
 * Provide a message via a mock JMS template and wait for the specified timeout to receive the message on the expected channel
 * @param obj The message provided to the poller (currently must be a String)
 * @param expectedOutputChannel The expected output channel
 * @param handler An instance of CountDownHandler to handle (verify) the output message
 * @param timeoutMillisec The timeout period. Note that this must allow at least enough time to process the entire flow. Only set if the default is
 * not long enough
 * @return true if the message was received on the expected channel
 * @throws JMSException
 * @throws InterruptedException
 */
protected boolean verifyJmsMessageOnOutputChannel(Object obj, SubscribableChannel expectedOutputChannel, CountDownHandler handler,int timeoutMillisec) throws JMSException,
        InterruptedException {

    if (!(obj instanceof String)) {
        throw new IllegalArgumentException("Only TextMessage is currently supported");
    }

    /*
     * Use mocks to create a message returned to the JMS inbound adapter. It is assumed that the JmsTemplate
     * is also a mock.
     */

    TextMessage message = mock(TextMessage.class);
    doReturn(new SimpleMessageConverter()).when(jmsTemplate).getMessageConverter();
    doReturn(message).when(jmsTemplate).receiveSelected(anyString());

    String text = (String) obj;

    CountDownLatch latch = new CountDownLatch(1);
    handler.setLatch(latch);

    doReturn(text).when(message).getText();

    expectedOutputChannel.subscribe(handler);

    boolean latchCountedToZero = latch.await(timeoutMillisec, TimeUnit.MILLISECONDS);

    if (!latchCountedToZero) {
        LOGGER.warn(String.format("The specified waiting time of the latch (%s ms) elapsed.", timeoutMillisec));
    }

    return latchCountedToZero;
}

/*
 * A MessageHandler that uses a CountDownLatch to synchronize with the calling thread
 */
private abstract class CountDownHandler implements MessageHandler {

    CountDownLatch latch;

    public final void setLatch(CountDownLatch latch){
        this.latch = latch;
    }

    protected abstract void verifyMessage(Message<?> message);

    /*
     * (non-Javadoc)
     *
     * @see
     * org.springframework.integration.core.MessageHandler#handleMessage
     * (org.springframework.integration.Message)
     */
    public void handleMessage(Message<?> message) throws MessagingException {
        verifyMessage(message);
        latch.countDown();
    }
}

但我得到以下异常:

[0;33mWARN  [main] [InboundChannelFlowUnitTest] The specified waiting time of the latch (2000 ms) elapsed.
[m
java.lang.AssertionError: message not sent to expected output channel

有什么提示吗?

编辑:

我添加了以下测试:

    @SuppressWarnings("unchecked")
    @Test
    public void testMessageDriven() throws Exception {
        TextMessage message = mock(TextMessage.class);
        when(message.getText()).thenReturn("foo");
        Session session = mock(Session.class);
        ((SessionAwareMessageListener<TextMessage>) this.messageListenerContainer.getMessageListener()).onMessage(message, session);
        CountDownHandler myCountDownHandler = new CountDownHandler() {
            @Override
            protected void verifyMessage(Message<?> message) {
                assertNotNull(message);
                assertEquals("hello", message.getPayload());
            }
        };
        CountDownLatch myLatch = new CountDownLatch(2);
        myCountDownHandler.setLatch(myLatch);
        this.inbound.subscribe(myCountDownHandler);

        boolean receivedBeforeZero = myLatch.await(3, TimeUnit.SECONDS);

        assertTrue(receivedBeforeZero);
    }

并将消息驱动的适配器更改为:

<int-jms:message-driven-channel-adapter id="jmsIn"
                                            channel="inbound"
                                            container="messageListenerContainer"
                                            acknowledge="transacted"/>

但还是报如下错误:

[0;33mWARN  [main] [InboundChannelFlowUnitTest] The specified waiting time of the latch (3 sec) elapsed.
[m
java.lang.AssertionError
    at org.junit.Assert.fail(Assert.java:92)
    at org.junit.Assert.assertTrue(Assert.java:43)
    at org.junit.Assert.assertTrue(Assert.java:54)

【问题讨论】:

    标签: spring jms spring-integration


    【解决方案1】:

    消息驱动的适配器不使用JmsTemplate,所以模拟它和它的接收方法不会做任何事情。

    您必须模拟/存根消息侦听器容器并调用其MessageListener。您可以通过“容器”属性将模拟容器提供给适配器。

    编辑

    尚不完全清楚为什么需要模拟/测试框架组件;您只需将测试消息发送到通道即可将测试消息注入您的流程。

    但是,如果您使用的是自定义消息转换器,并且想要就地测试它,您可以模拟容器。

    Here's how to do it.

    【讨论】:

    • 感谢加里的回答。如果我这样做,那么在 verifyJmsMessageOnOutputChannel 方法中,我是否仍然需要使用 CountDownLatch 并订阅频道的处理程序,或者我可以使用模拟 jmsTemplate 将消息发送到队列和侦听器应该捡起来吗?
    • 我也在努力理解如何模拟侦听器容器并调用 MessageListener,有什么可以参考的吗?
    • 谢谢加里。我基本上想测试一旦适配器检测到队列上收到消息,消息驱动的适配器是否路由到正确的通道。我假设要测试此流程,我必须模拟侦听器收到的消息。
    • 实际上,我已经用我得到的新测试和新错误更新了我的问题。我不想手动从通道接收消息,但一旦侦听器检测到队列上的消息,就测试正确的通道是否接收到消息。希望我很清楚。
    • 嗨,加里,尝试效仿你的例子,我得到了一个例外 Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0': Invocation of init method failed; nested exception is java.lang.IllegalArgumentException: No poller has been defined for endpoint 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0', and no default poller is available within the context.。我认为这是因为 queueChannel 定义为没有轮询器的队列。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2014-05-06
    • 1970-01-01
    • 1970-01-01
    • 2018-02-08
    • 2012-04-22
    • 1970-01-01
    • 2013-08-21
    相关资源
    最近更新 更多