【发布时间】:2017-11-17 12:09:26
【问题描述】:
我正在尝试使用 RabbitMQ-java 客户端 API 与 RabbitMQ 服务器交互。 我从java client api guide读到:
根据经验,线程之间共享 Channel 实例是应该避免的。应用程序应该更喜欢每个线程使用一个 Channel,而不是在多个线程之间共享同一个 Channel。
我正在尝试使用 corePoolSize 1 的 ThreadPoolExecutor 并添加 Runnable 任务以将消息保存在 RabbitMQ 队列中。这是我正在使用的代码:
package common;
import java.io.IOException;
import java.io.InputStream;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.gson.JsonObject;
import com.rabbitmq.client.BlockedListener;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
public class RabbitMQUtil {
private static Logger log= LoggerFactory.getLogger("logger");
private static RabbitMQUtil gmInstance;
private ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(10000));
private final String PROPERTIES_FILE_NAME = "config/rabbitmq.properties";
private final Properties properties = new Properties();
private String host = null;
private int port = 0;
private String username = null;
private String password = null;
private String useSSL = "false";
private ConnectionFactory factory;
private Connection connection;
private Channel channel;
private RabbitMQUtil() throws IOException, TimeoutException, Exception {
try {
InputStream stream = RabbitMQUtil.class.getClassLoader().getResourceAsStream(PROPERTIES_FILE_NAME);
if(stream != null) {
properties.load(stream);
}
} catch (Exception ex) {
log.error("Exception while loading the rabbitmq properties file:", ex);
}
host = properties.getProperty("rabbitmq.host", "localhost");
port = Integer.parseInt(properties.getProperty("rabbitmq.port", "5672"));
username = properties.getProperty("rabbitmq.username", "guest");
password = properties.getProperty("rabbitmq.password", "guest");
useSSL = properties.getProperty("rabbitmq.usessl", "false");
factory = new ConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
if("true".equalsIgnoreCase(useSSL)) {
try {
factory.useSslProtocol();
} catch (KeyManagementException | NoSuchAlgorithmException e) {
log.error("Exception while applying the tls for rabbitmq:", e);
}
}
connection = factory.newConnection();
connection.addBlockedListener(new RabbitMQBlockedListener());
connection.addShutdownListener(new RabbitMQShutDownListener());
channel = connection.createChannel();
}
public static RabbitMQUtil getInstance() {
if(gmInstance == null) {
synchronized (RabbitMQUtil.class) {
if(gmInstance == null) {
try {
gmInstance = new RabbitMQUtil();
} catch (IOException | TimeoutException e) {
log.error("Exception in getInstance:", e);
} catch (Exception e) {
log.error("Exception in getInstance:", e);
}
}
}
}
return gmInstance;
}
public static void saveErrorMessagesInLogs(JsonObject obj, String queueName) {
log.info("data to be saved for :"+queueName+" is:"+obj.toString());
}
public void saveMsgInQueue(JsonObject gson, String queueName) {
this.executor.execute(new RabbitMQData(gson, queueName));
}
private class RabbitMQBlockedListener implements BlockedListener {
@Override
public void handleBlocked(String arg0) throws IOException {
log.warn("blocked listener called:", arg0);
}
@Override
public void handleUnblocked() throws IOException {
log.warn("unblocked listener called:");
}
}
private class RabbitMQShutDownListener implements ShutdownListener {
@Override
public void shutdownCompleted(ShutdownSignalException cause) {
log.error("Shutdown event listener called:", cause);
log.error("shutdown event listener:"+cause.isHardError());
}
}
private class RabbitMQData implements Runnable{
JsonObject obj;
String queueName;
public RabbitMQData(JsonObject obj, String queueName) {
Thread.currentThread().setName("RabbitMQ Thread:"+obj.get("userid")+" -->"+queueName);
this.obj = obj;
this.queueName = queueName;
}
@Override
public void run() {
try {
channel.queueDeclare(this.queueName, true, false, false, null);
channel.basicPublish("", this.queueName, MessageProperties.PERSISTENT_BASIC, this.obj.toString().getBytes());
} catch (Exception e) {
log.info("Error while running the scheduled rabbitmq task:", e);
log.info("data to be saved for :"+this.queueName+" is:"+this.obj.toString());
}
}
}
public static void saveRabbitMQData(JsonObject obj, String queueName) {
RabbitMQUtil util = RabbitMQUtil.getInstance();
if(util != null)
util.saveMsgInQueue(obj, queueName);
else
RabbitMQUtil.saveErrorMessagesInLogs(obj, queueName);
}
}
我想知道以下几点:
- 仅使用 1 个线程的线程池时使用单个通道是否可以?
- 在触发阻塞/解除阻塞和关闭事件时应如何处理连接和通道对象?虽然当 RabbitMQ 服务器再次启动时 API 会自动建立连接。
任何其他反馈将不胜感激。
谢谢
【问题讨论】:
标签: java multithreading rabbitmq threadpoolexecutor