【问题标题】:I need to mock a RabbitMQ in my unit Test我需要在我的单元测试中模拟一个 RabbitMQ
【发布时间】:2016-05-09 10:25:38
【问题描述】:

我在我的项目中使用 RabbitMQ。

我的消费者中有rabbitMQ客户端部分的代码,连接需要一个tls1.1来连接真正的MQ。

我想在我的 JUnit 测试中测试此代码并模拟向我的消费者传递的消息。

我在 google 中看到了几个使用不同工具的示例,骆驼兔或 activeMQ 是如何使用的,但此工具适用于 amqp 1.0,而 rabbitMQ 仅适用于 amqp 0.9。

有人遇到过这个问题吗?

谢谢!

更新

这是测试从队列中接收 json 的代码。

package com.foo.foo.queue;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.URL;
import java.security.*;
import java.security.cert.CertificateException;
import javax.net.ssl.*;

import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.json.JSONObject;

import com.foo.foo.Constants.Constants;
import com.foo.foo.core.ConfigurationContainer;
import com.foo.foo.policyfinders.PolicyFinder;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class BrokerThreadHLConsumer extends Thread {

private static BrokerThreadHLConsumer instance;

private static final Logger log = LogManager.getLogger(BrokerThreadHLConsumer.class);

private Channel channel;
private String queueName;
private PolicyFinder PolicyFinder;
private Connection connection;
private QueueingConsumer consumer;

private boolean loop;

private BrokerThreadHLConsumer() throws IOException {
    ConnectionFactory factory = new ConnectionFactory();
    char[] keyPassphrase = "clientrabbit".toCharArray();
    KeyStore keyStoreCacerts;
    ConfigurationContainer configurationContainer = ConfigurationContainer.getInstance();
    String exchangeName = configurationContainer.getProperty(Constants.EXCHANGE_NAME);
    String rabbitHost = configurationContainer.getProperty(Constants.RABBITMQ_SERVER_HOST_VALUE);
    try {
        /* Public key cacerts to connect to message queue*/
        keyStoreCacerts = KeyStore.getInstance("PKCS12");
        URL resourcePublicKey = this.getClass().getClassLoader().getResource("certs/client.keycert.p12");
        File filePublicKey = new File(resourcePublicKey.toURI());
        keyStoreCacerts.load(new FileInputStream(filePublicKey), keyPassphrase);
        KeyManagerFactory keyManager;

        keyManager = KeyManagerFactory.getInstance("SunX509");
        keyManager.init(keyStoreCacerts, keyPassphrase);

        char[] trustPassphrase = "changeit".toCharArray();
        KeyStore tks;

        tks = KeyStore.getInstance("JCEKS");

        URL resourceCacerts = this.getClass().getClassLoader().getResource("certs/cacerts");
        File fileCacerts = new File(resourceCacerts.toURI());

        tks.load(new FileInputStream(fileCacerts), trustPassphrase);

        TrustManagerFactory tmf;
        tmf = TrustManagerFactory.getInstance("SunX509");
        tmf.init(tks);

        SSLContext c = SSLContext.getInstance("TLSv1.1");
        c.init(keyManager.getKeyManagers(), tmf.getTrustManagers(), null);

        factory.setUri(rabbitHost);
        factory.useSslProtocol(c);
        connection = factory.newConnection();
        channel = connection.createChannel();
        channel.exchangeDeclare(exchangeName, "fanout");
        queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, exchangeName, "");

    } catch (NoSuchAlgorithmException e) {
        e.printStackTrace();
    } catch (CertificateException e) {
        e.printStackTrace();
    } catch (KeyStoreException e) {
        e.printStackTrace();
    } catch (UnrecoverableKeyException e) {
        e.printStackTrace();
    } catch (KeyManagementException e1) {
        e1.printStackTrace();
    } catch (Exception e) {
        log.error("Couldn't instantiate a channel with the broker installed in " + rabbitHost);
        log.error(e.getStackTrace());
        e.printStackTrace();
    }
}

public static BrokerThreadHLConsumer getInstance() throws CertificateException, UnrecoverableKeyException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException, IOException {
    if (instance == null)
        instance = new BrokerThreadHLConsumer();
    return instance;
}

public void run() {
    if (PolicyFinder != null) {
        try {
            consumer = new QueueingConsumer(channel);
            channel.basicConsume(queueName, true, consumer);
            log.info("Consumer broker started and waiting for messages");
            loop = true;
            while (loop) {
                try {
                    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                    String message = new String(delivery.getBody());
                    JSONObject obj = new JSONObject(message);
                    log.info("Message received from broker " + obj);
                    if (StringUtils.isNotEmpty(message) && !PolicyFinder.managePolicySet(obj)) {
                        log.error("PolicySet error: error upgrading the policySet");
                    }
                } catch (Exception e) {
                    log.error("Receiving message error");
                    log.error(e);
                }
            }
        } catch (IOException e) {
            log.error("Consumer couldn't start");
            log.error(e.getStackTrace());
        }
    } else {
        log.error("Consumer couldn't start cause of PolicyFinder is null");
    }
}

public void close() {
    loop = false;
    try {
        consumer.getChannel().basicCancel(consumer.getConsumerTag());
    } catch (IOException e) {
        e.printStackTrace();
    }
    try {
        channel.close();
    } catch (IOException e) {
        e.printStackTrace();
    }
    try {
        connection.close();
    } catch (IOException e) {
        e.printStackTrace();
    }
}

public void setLuxPolicyFinder(PolicyFinder PolicyFinder) {
    this.PolicyFinder = PolicyFinder;
}
}

【问题讨论】:

  • 要测试的代码好吗?到目前为止你尝试过什么?
  • 对集成点进行单元测试没有多大意义,因为它没有提供任何价值。毕竟,您可以以与真正的代理不兼容的方式模拟整个 API。我并不是说您不应该对此进行测试,而是使用集成测试。在您的基础架构中运行一个 test rabbitMQ 实例并针对它运行集成测试。您可以在每次测试运行时随机创建一个通道,这样就不会发生冲突。或者,如果你使用 docker 或类似的东西,你可以启动一个私人经纪人。
  • 有时 SO 在回答问题时表现得像人类 - 绝望!!!

标签: java junit mocking rabbitmq amqp


【解决方案1】:

据我了解,问题中有两件事要测试:

  • TLS 配置连接到 RabbitMQ
  • basicPublish / basicConsume(所谓的delivery)与应用程序其余部分交互的行为

对于第一个,由于 TLS 本身正在测试中,只有连接到配置正确信任库的 RabbitMQ 真实实例才能证明配置正常

但是,对于第二个,对于演示应用程序功能的测试(使用 Cucumber 等工具提高可读性),您可以尝试我正在开发的库:rabbitmq-mock(这就是为什么我要挖掘一个旧的发布)

只需将其作为依赖项包含:

<dependency>
    <groupId>com.github.fridujo</groupId>
    <artifactId>rabbitmq-mock</artifactId>
    <version>1.0.14</version>
    <scope>test</scope>
</dependency>

并在您的单元测试中将new ConnectionFactory() 替换为new MockConnectionFactory()

项目中有样品:https://github.com/fridujo/rabbitmq-mock/blob/master/src/test/java/com/github/fridujo/rabbitmq/mock/IntegrationTest.java

【讨论】:

    【解决方案2】:

    我知道,这是一个老问题,但到目前为止还没有答案。在同一个问题上对我有很大帮助的是以下博客文章:https://tamasgyorfi.net/2016/04/21/writing-integration-tests-for-rabbitmq-based-components/。 它使用 Apache QPID(不是 OP 中建议的 ActiveMQ),并且支持 AMQP 0.9.1。

    【讨论】:

      【解决方案3】:

      这就是我的做法,有些东西可能在隐藏必要的类实现细节的过程中到处都是,但你会得到一个提示! :)

      • 单元测试的假设:
        • RMQ 工作正常,发送到它的数据将被推送到队列中
        • 唯一需要测试的是生成的数据是否正确
        • 以及对 RMQs send() 的调用是否正在发生!

       public class SomeClassTest {
              private Config config;
              private RmqConfig rmqConfig;
              private static final ObjectMapper mapper = new ObjectMapper();
              private JasperServerClient jasperServerClient;
          //    @Mock
              @InjectMocks
              private RabbitMQProducer rabbitMQProducer;
              private Connection phoenixConnection;
              private String targetNotificationMessage;
              SomeClass someClassObject;
      
              @Before
              public void setUp() {
      
                  // Mock basic stuffs
                  config = mock(Config.class);
                  Connection = mock(Connection.class);
                  rabbitMQProducer = mock(RabbitMQProducer.class); // Imp
      
      
                  jasperServerClient = mock(JasperServerClient.class);
      
                  rmqConfig = RmqConfig.builder()
                          .host("localhost")
                          .port(5672)
                          .userName("guest")
                          .password("guest")
                          .queueName("somequeue_name")
                          .prefetch(1)
                          .build();
                  final String randomMessage = "This is a waste message";
                  Message mockMsg = Message.forSending(randomMessage.getBytes(), null, rmqConfig.getQueueName(), rmqConfig.getQueueName(), "text/plain", "UTF-8", true); // prepare a mock message
      
      
                  // Prepare service configs
                  ConnectionConfig connectionConfig = RmqConfigUtil.getConfig(rmqConfig);
                  ProducerConfig producerConfig = new ProducerConfigBuilder()
                          .exchange(rmqConfig.getQueueName())
                          .contentType("text/pain")
                          .contentEncoding("UTF-8")
                          .connection(connectionConfig).build();
                  rabbitMQProducer.open(croducerConfig.asMap());
      
                  // build the major stuff where the code resides
                  someClassObject =  SomeClass.builder()
                          .phoenixConnection(phoenixConnection)
                          .userExchangeName(rmqConfig.getQueueName())
                          .userRabbitMQProducer(rabbitMQProducer)
                          .ftpConfig(config.getFtpConfig())
                          .jasperServerClient(jasperServerClient)
                          .objectMapper(new ObjectMapper())
                          .build();
      
                  MockitoAnnotations.initMocks(this);
              }
      
      
              @Test
              public void testNotificationPub() throws Exception {
      
                  // Prepare expected Values
                  targetNotificationMessage = <<some message>>
      
                  // Reflection -  my target functions were private
                  Class cls = Class.forName("com.some.path.to.class");
                  Object[] objForGetMessage = {<<stuffs>>, <<stuffs>>};
      
                  Method getNotificationMessage = cls.getDeclaredMethod("private_fn_1", <<some class>>.class, <<some class>>.class);
                  Method pubNotification = cls.getDeclaredMethod("private_fn_2", <<some class>>.class, RabbitMQProducer.class, String.class);
      
                  getNotificationMessage.setAccessible(true);
                  pubNotification.setAccessible(true);
      
                  // Test Case #1
                  final <<some class>> notificationMessage = (<<some class>>)getNotificationMessage.invoke(someClassObject, objForGetMessage);
                  assertEquals(notificationMessage.getMessage(), targetNotificationMessage);
      
                  // Test Case #2 -  this does RMQ call
                  Object[] objPubMessage = {notificationMessage, rabbitMQProducer, rmqConfig.getQueueName()};
                  final Object publishNotification = pubNotification.invoke(someClassObject, objPubMessage);
                  assertEquals(publishNotificationResp, publishNotification); //viola
      
      
                  //Important, since RabbitMQProducer is mocked, we need to checkup if function call is made to "send" function which send data to RMQ
                  verify(rabbitMQProducer,times(1)).send(any());
      
              }
      
      
              @Test
              public void testMockCreation(){
                  assertNotNull(rmqConfig);
                  assertNotNull(config);
              }
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2019-04-04
        • 2015-02-05
        相关资源
        最近更新 更多