【发布时间】:2016-05-09 10:25:38
【问题描述】:
我在我的项目中使用 RabbitMQ。
我的消费者中有rabbitMQ客户端部分的代码,连接需要一个tls1.1来连接真正的MQ。
我想在我的 JUnit 测试中测试此代码并模拟向我的消费者传递的消息。
我在 google 中看到了几个使用不同工具的示例,骆驼兔或 activeMQ 是如何使用的,但此工具适用于 amqp 1.0,而 rabbitMQ 仅适用于 amqp 0.9。
有人遇到过这个问题吗?
谢谢!
更新
这是测试从队列中接收 json 的代码。
package com.foo.foo.queue;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.URL;
import java.security.*;
import java.security.cert.CertificateException;
import javax.net.ssl.*;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.json.JSONObject;
import com.foo.foo.Constants.Constants;
import com.foo.foo.core.ConfigurationContainer;
import com.foo.foo.policyfinders.PolicyFinder;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class BrokerThreadHLConsumer extends Thread {
private static BrokerThreadHLConsumer instance;
private static final Logger log = LogManager.getLogger(BrokerThreadHLConsumer.class);
private Channel channel;
private String queueName;
private PolicyFinder PolicyFinder;
private Connection connection;
private QueueingConsumer consumer;
private boolean loop;
private BrokerThreadHLConsumer() throws IOException {
ConnectionFactory factory = new ConnectionFactory();
char[] keyPassphrase = "clientrabbit".toCharArray();
KeyStore keyStoreCacerts;
ConfigurationContainer configurationContainer = ConfigurationContainer.getInstance();
String exchangeName = configurationContainer.getProperty(Constants.EXCHANGE_NAME);
String rabbitHost = configurationContainer.getProperty(Constants.RABBITMQ_SERVER_HOST_VALUE);
try {
/* Public key cacerts to connect to message queue*/
keyStoreCacerts = KeyStore.getInstance("PKCS12");
URL resourcePublicKey = this.getClass().getClassLoader().getResource("certs/client.keycert.p12");
File filePublicKey = new File(resourcePublicKey.toURI());
keyStoreCacerts.load(new FileInputStream(filePublicKey), keyPassphrase);
KeyManagerFactory keyManager;
keyManager = KeyManagerFactory.getInstance("SunX509");
keyManager.init(keyStoreCacerts, keyPassphrase);
char[] trustPassphrase = "changeit".toCharArray();
KeyStore tks;
tks = KeyStore.getInstance("JCEKS");
URL resourceCacerts = this.getClass().getClassLoader().getResource("certs/cacerts");
File fileCacerts = new File(resourceCacerts.toURI());
tks.load(new FileInputStream(fileCacerts), trustPassphrase);
TrustManagerFactory tmf;
tmf = TrustManagerFactory.getInstance("SunX509");
tmf.init(tks);
SSLContext c = SSLContext.getInstance("TLSv1.1");
c.init(keyManager.getKeyManagers(), tmf.getTrustManagers(), null);
factory.setUri(rabbitHost);
factory.useSslProtocol(c);
connection = factory.newConnection();
channel = connection.createChannel();
channel.exchangeDeclare(exchangeName, "fanout");
queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, exchangeName, "");
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
} catch (CertificateException e) {
e.printStackTrace();
} catch (KeyStoreException e) {
e.printStackTrace();
} catch (UnrecoverableKeyException e) {
e.printStackTrace();
} catch (KeyManagementException e1) {
e1.printStackTrace();
} catch (Exception e) {
log.error("Couldn't instantiate a channel with the broker installed in " + rabbitHost);
log.error(e.getStackTrace());
e.printStackTrace();
}
}
public static BrokerThreadHLConsumer getInstance() throws CertificateException, UnrecoverableKeyException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException, IOException {
if (instance == null)
instance = new BrokerThreadHLConsumer();
return instance;
}
public void run() {
if (PolicyFinder != null) {
try {
consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
log.info("Consumer broker started and waiting for messages");
loop = true;
while (loop) {
try {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
JSONObject obj = new JSONObject(message);
log.info("Message received from broker " + obj);
if (StringUtils.isNotEmpty(message) && !PolicyFinder.managePolicySet(obj)) {
log.error("PolicySet error: error upgrading the policySet");
}
} catch (Exception e) {
log.error("Receiving message error");
log.error(e);
}
}
} catch (IOException e) {
log.error("Consumer couldn't start");
log.error(e.getStackTrace());
}
} else {
log.error("Consumer couldn't start cause of PolicyFinder is null");
}
}
public void close() {
loop = false;
try {
consumer.getChannel().basicCancel(consumer.getConsumerTag());
} catch (IOException e) {
e.printStackTrace();
}
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
public void setLuxPolicyFinder(PolicyFinder PolicyFinder) {
this.PolicyFinder = PolicyFinder;
}
}
【问题讨论】:
-
要测试的代码好吗?到目前为止你尝试过什么?
-
对集成点进行单元测试没有多大意义,因为它没有提供任何价值。毕竟,您可以以与真正的代理不兼容的方式模拟整个 API。我并不是说您不应该对此进行测试,而是使用集成测试。在您的基础架构中运行一个 test rabbitMQ 实例并针对它运行集成测试。您可以在每次测试运行时随机创建一个通道,这样就不会发生冲突。或者,如果你使用 docker 或类似的东西,你可以启动一个私人经纪人。
-
有时 SO 在回答问题时表现得像人类 - 绝望!!!
标签: java junit mocking rabbitmq amqp