【问题标题】:Is it fine to use single channel for communication with a single thread executor in RabbitMQ?使用单通道与 RabbitMQ 中的单线程执行器进行通信是否可以?
【发布时间】:2017-11-17 12:09:26
【问题描述】:

我正在尝试使用 RabbitMQ-java 客户端 API 与 RabbitMQ 服务器交互。 我从java client api guide读到:

根据经验,线程之间共享 Channel 实例是应该避免的。应用程序应该更喜欢每个线程使用一个 Channel,而不是在多个线程之间共享同一个 Channel。

我正在尝试使用 corePoolSize 1 的 ThreadPoolExecutor 并添加 Runnable 任务以将消息保存在 RabbitMQ 队列中。这是我正在使用的代码:

package common;

import java.io.IOException;
import java.io.InputStream;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.gson.JsonObject;
import com.rabbitmq.client.BlockedListener;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;

public class RabbitMQUtil {
    private static Logger log= LoggerFactory.getLogger("logger");
    private static RabbitMQUtil gmInstance;
    private ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(10000));
    private final String PROPERTIES_FILE_NAME = "config/rabbitmq.properties";
    private final Properties properties = new Properties();
    private String host = null;
    private int port = 0;
    private String username = null;
    private String password = null;
    private String useSSL = "false";
    private ConnectionFactory factory;
    private Connection connection;
    private Channel channel;

    private RabbitMQUtil() throws IOException, TimeoutException, Exception {
        try {
            InputStream stream = RabbitMQUtil.class.getClassLoader().getResourceAsStream(PROPERTIES_FILE_NAME);
            if(stream != null) {
                properties.load(stream);
            }
        } catch (Exception ex) {
            log.error("Exception while loading the rabbitmq properties file:", ex);
        }

        host = properties.getProperty("rabbitmq.host", "localhost");
        port = Integer.parseInt(properties.getProperty("rabbitmq.port", "5672"));
        username = properties.getProperty("rabbitmq.username", "guest");
        password = properties.getProperty("rabbitmq.password", "guest");
        useSSL = properties.getProperty("rabbitmq.usessl", "false");

        factory = new ConnectionFactory();
        factory.setHost(host);
        factory.setPort(port);
        factory.setUsername(username);
        factory.setPassword(password);
        if("true".equalsIgnoreCase(useSSL)) {
            try {
                factory.useSslProtocol();
            } catch (KeyManagementException | NoSuchAlgorithmException e) {
                log.error("Exception while applying the tls for rabbitmq:", e);
            }
        }
        connection = factory.newConnection();
        connection.addBlockedListener(new RabbitMQBlockedListener());
        connection.addShutdownListener(new RabbitMQShutDownListener());

        channel = connection.createChannel();
    }

    public static RabbitMQUtil getInstance() {
        if(gmInstance == null) {
            synchronized (RabbitMQUtil.class) {
                if(gmInstance == null) {
                    try {
                        gmInstance = new RabbitMQUtil();
                    } catch (IOException | TimeoutException e) {
                        log.error("Exception in getInstance:", e);
                    } catch (Exception e) {
                        log.error("Exception in getInstance:", e);
                    }
                }
            }
        }
        return gmInstance;
    }

    public static void saveErrorMessagesInLogs(JsonObject obj, String queueName) {
        log.info("data to be saved for :"+queueName+" is:"+obj.toString());
    }

    public void saveMsgInQueue(JsonObject gson, String queueName) {
        this.executor.execute(new RabbitMQData(gson, queueName));
    }

    private class RabbitMQBlockedListener implements BlockedListener {
        @Override
        public void handleBlocked(String arg0) throws IOException {
            log.warn("blocked listener called:", arg0);
        }

        @Override
        public void handleUnblocked() throws IOException {
            log.warn("unblocked listener called:");
        }
    }

    private class RabbitMQShutDownListener implements ShutdownListener {
        @Override
        public void shutdownCompleted(ShutdownSignalException cause) {
            log.error("Shutdown event listener called:", cause);
            log.error("shutdown event listener:"+cause.isHardError());
        }
    }

    private class RabbitMQData implements Runnable{
        JsonObject obj;
        String queueName;
        public RabbitMQData(JsonObject obj, String queueName) {
            Thread.currentThread().setName("RabbitMQ Thread:"+obj.get("userid")+" -->"+queueName);
            this.obj = obj;
            this.queueName = queueName;
        }

        @Override
        public void run() {
            try {
                channel.queueDeclare(this.queueName, true, false, false, null);
                channel.basicPublish("", this.queueName, MessageProperties.PERSISTENT_BASIC, this.obj.toString().getBytes());
            } catch (Exception e) {
                log.info("Error while running the scheduled rabbitmq task:", e);
                log.info("data to be saved for :"+this.queueName+" is:"+this.obj.toString());
            }
        }
    }

    public static void saveRabbitMQData(JsonObject obj, String queueName) {
        RabbitMQUtil util = RabbitMQUtil.getInstance();
        if(util != null) 
            util.saveMsgInQueue(obj, queueName);
        else
            RabbitMQUtil.saveErrorMessagesInLogs(obj, queueName);
    }
}

我想知道以下几点:

  1. 仅使用 1 个线程的线程池时使用单个通道是否可以?
  2. 在触发阻塞/解除阻塞和关闭事件时应如何处理连接和通道对象?虽然当 RabbitMQ 服务器再次启动时 API 会自动建立连接。

任何其他反馈将不胜感激。

谢谢

【问题讨论】:

    标签: java multithreading rabbitmq threadpoolexecutor


    【解决方案1】:

    1.- 仅使用 1 个线程的线程池时使用单个通道是否可以?

    是的,没关系。这就是你应该这样做的方式。只有一个线程必须使用 Channel 实例。否则,确认可能会丢失(请参阅此处:https://www.rabbitmq.com/releases/rabbitmq-java-client/v3.1.1/rabbitmq-java-client-javadoc-3.1.1/com/rabbitmq/client/Channel.html

    2.- 当阻塞/解锁和关闭事件被触发时,应该如何处理连接和通道对象?虽然当 RabbitMQ 服务器再次启动时 API 会自动建立连接。

    当应用程序关闭时,您应该关闭通道,然后关闭与 RabbitMQ 的连接。

        channel.close();
        conn.close();
    

    关于阻止/解除阻止,请在此处阅读 (https://www.rabbitmq.com/api-guide.html):

    对消费者的回调在线程池中调度,该线程池与实例化其 Channel 的线程分开。这意味着消费者可以安全地调用 Connection 或 Channel 上的阻塞方法,例如 Channel#queueDeclare 或 Channel#basicCancel。

    每个 Channel 都有自己的调度线程。对于每个通道一个消费者的最常见用例,这意味着消费者不会阻碍其他消费者。如果每个 Channel 有多个 Consumer,请注意长时间运行的 Consumer 可能会阻止向该 Channel 上的其他 Consumer 发送回调。

    【讨论】:

    • 感谢您的回复。我不得不在shutdownlistener 中使用connection.abort() 和channel.abort()。因为关闭导致异常,并且 connection = null 和 channel = null 在同一个侦听器中。在将消息发送到rabbitmq服务器之前,我正在创建新的连接和通道它们是空的。在阻塞的侦听器中使用相同的策略是一种好习惯吗?
    猜你喜欢
    • 2015-04-23
    • 2016-08-06
    • 1970-01-01
    • 1970-01-01
    • 2012-08-16
    • 2014-04-12
    • 2020-06-16
    • 2014-02-13
    • 2010-09-20
    相关资源
    最近更新 更多