【问题标题】:Unsubscribe durable subscribers with ActiveMQ使用 ActiveMQ 取消订阅持久订阅者
【发布时间】:2017-01-24 17:05:57
【问题描述】:

我正在尝试取消订阅 TOPICS 的持久订阅者。

我的应用是一种社交网络:每个用户都是其他用户的话题。因此,每次用户在做某事时,都会通知他的朋友。当然,订阅者可能会取消订阅某个主题,不想再收到有关用户的通知。

每次我尝试取消订阅某个主题的订阅者时,都会收到一条错误消息,告诉我:“javax.jms.JMSException: Durable consumer is in use

这是我的 2 个类,SENDER 一个和 RECEIVER 一个。谁能告诉我我做错了什么??

SENDER 类:

package com.citizenweb.classes;

import java.util.Date;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageFormatException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.ObjectMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;

import com.citizenweb.interfaces.PostIF;
import com.citizenweb.interfaces.UserIF;

public class Sender {

    private ActiveMQConnectionFactory factory = null;
    private ActiveMQConnection connection = null;
    private ActiveMQSession session = null;
    private Destination destination = null;
    private MessageProducer producer = null;

    public Sender() {
    }

    public void connect(){
        try{
            factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL);
            // TODO Mécanisme de sécurité d'ActiveMQ à rétablir en production
            factory.setTrustAllPackages(true);
            connection = (ActiveMQConnection) factory.createConnection();
            connection.start();
            session = (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        } catch (JMSException e){
            e.printStackTrace();
        }
    }

    public void sendPost(UserIF user,PostIF post) {
        if(session==null){connect();}
        try {
            destination = session.createTopic(user.toString());
            producer = session.createProducer(destination);
            ObjectMessage postMessage = session.createObjectMessage();
            postMessage.setObject(post);
            producer.send(postMessage);
            System.out.println("\n SENDER Object message sent");



        } catch (MessageFormatException e) {
            e.printStackTrace();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    public void sendInformation(UserIF user,String info){
        if(session==null){connect();}
        try {
            destination = session.createTopic(user.toString());
            producer = session.createProducer(destination);
            TextMessage infoMessage = session.createTextMessage();
            infoMessage.setText(info);
            producer.send(infoMessage);
            System.out.println("\n SENDER Information message sent");
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    /**
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {

        UserIF u1, u2, u3;
        String[] nom = new String[5];
        String[] prenom = new String[5];
        String[] login = new String[5];
        String[] password = new String[5];
        Date[] naiss = new Date[5];
        String[] mail = new String[5];
        for (int i = 0; i < 5; i++) {
            nom[i] = "nom_" + i;
            prenom[i] = "prenom_" + i;
            login[i] = "login_" + i;
            password[i] = "password_" + i;
            naiss[i] = new Date();
            mail[i] = "mail_" + i;
        }

        System.out.println("\n SENDER AFFECTATION DES NOMS");
        u1 = new User(nom[0], prenom[0], login[0], password[0], naiss[0], mail[0]);
        u2 = new User(nom[1], prenom[1], login[1], password[1], naiss[1], mail[1]);
        u3 = new User(nom[2], prenom[2], login[2], password[2], naiss[2], mail[2]);


        Sender sender = new Sender();

        sender.sendInformation(u1, "U1 notification");
        sender.sendInformation(u2, "U2 notification");
        sender.sendInformation(u3, "U3 notification");
        //PostIF post = new Post("Mon Post","Ceci est mon message",u1,u1,"Classe Sender",((User) u1).getIdUser(),0);
        //sender.sendPost(user, post);
        sender.session.close();
        sender.connection.close();

    }

}

接收器类:

package com.citizenweb.classes;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.broker.region.Destination;
import com.citizenweb.interfaces.PostIF;
import com.citizenweb.interfaces.UserIF;
import com.citizenweb.classes.Post;

public class Receiver implements MessageListener, Serializable {

    private static final long serialVersionUID = 1L;
    private ActiveMQConnectionFactory factory = null;
    private ActiveMQConnection connection = null;
    private ActiveMQSession session = null;
    private Topic destination = null;
    private MessageConsumer consumer = null;

    UserIF userTopic = new User();
    UserIF userSubscriber = new User();
    List<Message> listeMsg = new ArrayList<Message>();

    public Receiver(UserIF subscriber) {
        this.userSubscriber = subscriber;
    }

    public void connect() {
        try {
            factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL);
            // TODO Mécanisme de sécurité d'ActiveMQ à rétablir en production
            factory.setTrustAllPackages(true);
            connection = (ActiveMQConnection) factory.createConnection();
            // ClientID :
            // https://qnalist.com/questions/2068823/create-durable-topic-subscriber
            connection.setClientID(userSubscriber.toString());
            connection.start();
            session = (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    public void receiveMessage(UserIF topic) {
        try {
            if (session == null) {
                connect();
            }
            destination = session.createTopic(topic.toString());
            String nomAbonnement = topic.toString() + "->" + userSubscriber.toString();
            //String nomAbonnement = userSubscriber.toString();
            consumer = session.createDurableSubscriber(destination, nomAbonnement);
            consumer.setMessageListener(this);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    public void unsubscribe(UserIF topic) {
        try {
            if (session == null) {
                connect();
            }
            System.out.println("\n RECEIVER Désinscription du topic " + topic.toString());
            //consumer.close();
            String nomAbonnement = topic.toString() + "->" + userSubscriber.toString();
            //String nomAbonnement = userSubscriber.toString();
            System.out.println("\n RECEIVER Abonnement à clore = " + nomAbonnement);
            session.unsubscribe(nomAbonnement);
            System.out.println("\n RECEIVER " + userSubscriber.toString() + " s'est désinscrit de " + nomAbonnement);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void onMessage(Message message) {
        System.out.println("\n RECEIVER OnMessage triggered for " + userSubscriber.toString());
        listeMsg.add(message);
        System.out.println("\n RECEIVER Nombre de messages reçus par " + userSubscriber + " = " + listeMsg.size());
        String classe = message.getClass().getSimpleName();
        System.out.println("\n RECEIVER Classe de message : " + classe);
        try {
            if (message instanceof TextMessage) {
                TextMessage text = (TextMessage) message;
                System.out.println("\n RECEIVER Information : " + text.getText());
            }
            if (message instanceof ObjectMessage) {
                System.out.println("\n RECEIVER ObjectMessage");
                ObjectMessage oMessage = (ObjectMessage) message;
                if (oMessage.getObject() instanceof PostIF) {
                    PostIF post = (PostIF) oMessage.getObject();
                    String s = ((Post) post).getCorpsMessage();
                    System.out.println("\n RECEIVER Post : " + s);
                }
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws JMSException {

        /*
         * EACH USER IS A TOPIC FOR OTHER USERS
         * WHATEVER A USER DOES RESULTS IN A NOTIFICATION TO SUBSCRIBERS
        */

        //CREATE USER
        UserIF u1, u2, u3;
        String[] nom = new String[5];
        String[] prenom = new String[5];
        String[] login = new String[5];
        String[] password = new String[5];
        Date[] naiss = new Date[5];
        String[] mail = new String[5];
        for (int i = 0; i < 5; i++) {
            nom[i] = "nom_" + i;
            prenom[i] = "prenom_" + i;
            login[i] = "login_" + i;
            password[i] = "password_" + i;
            naiss[i] = new Date();
            mail[i] = "mail_" + i;
        }

        u1 = new User(nom[0], prenom[0], login[0], password[0], naiss[0], mail[0]);
        u2 = new User(nom[1], prenom[1], login[1], password[1], naiss[1], mail[1]);
        u3 = new User(nom[2], prenom[2], login[2], password[2], naiss[2], mail[2]);

        /*
         * MAKE EACH USER A SUBSCRIBER
         */
        Receiver receiver1 = new Receiver(u1);
        Receiver receiver2 = new Receiver(u2);
        Receiver receiver3 = new Receiver(u3);

        /*
         * PUT A MESSAGE LISTENER FOR EACH USER
         */
        receiver1.receiveMessage(u2);
        receiver1.receiveMessage(u3);
        receiver2.receiveMessage(u1);
        receiver2.receiveMessage(u3);
        receiver3.receiveMessage(u1);
        receiver3.receiveMessage(u2);

        /*
         * CALL THE SENDER CLASS TO SEND MESSAGES
         */
        try {
            Sender.main(args);
        } catch (Exception e1) {
            e1.printStackTrace();
        }

        /*
         * A SLEEP TO HAVE ENOUGH TIME TO LOOK AT THE ACTIVEMQ CONSOLE
         * CAN BE REMOVE
         */
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
            return;
        }

        /*
         * UNSUBSCRIBE SUBSCRIBERS FROM TOPICS
         */
        receiver1.unsubscribe(u2);
        receiver1.unsubscribe(u3);
        receiver2.unsubscribe(u1);
        receiver2.unsubscribe(u3);
        receiver3.unsubscribe(u1);
        receiver3.unsubscribe(u2);
    }

}

【问题讨论】:

    标签: java jms activemq jms-topic durable-subscription


    【解决方案1】:

    如果当前没有活动订阅者正在使用它,您只能取消订阅持久订阅。看起来您的代码创建了多个订阅并且不会停止消费者,因此取消订阅当然会失败,如果您关闭消费者然后取消订阅,您应该会得到您正在寻找的结果。

    持久订阅取消订阅的一个例子是here

    【讨论】:

    • 嗨,蒂姆,感谢您的帮助。当您说“如果当前没有活动订阅者正在使用持久订阅,您只能取消订阅持久订阅”,您似乎在谈论一个主题。我的问题是一个主题可能有许多持久订阅者。其中一个订阅者可能想要取消订阅该主题,并且没有理由停止对同一主题的其他订阅者的订阅我想停止订阅一个主题而不杀死该主题本身这就是我想要对取消订阅执行的操作( ) 我的 Receiver 类的方法不可能吗?
    • 这不是我说的,请仔细阅读。处于活动状态的 Topic 订阅无法取消订阅,您需要关闭正在使用您要取消订阅的订阅的消费者。
    • OK Tim 所以如果我按照我的代码所示执行 session.unsubscribe(durableID) ,这应该停止给定订阅者的订阅,但前提是我之前已经完成了 consumer.close() ?我已经尝试了很多,但总是收到相同的错误消息:正在使用持久消费者我应该做的更多:'consumer.close();' 'session.unsubscribe(nomAbonnement);'
    • 如果我有 2 个名为“Topic A”和“TopicB”的主题,以及每个主题“U1”、“U2 和“U3”的 3 个持久订阅者,我应该怎么做才能停止TopicA 的“U1”订阅? session.unsubscribe(durableID) 语句必须指定“durableID”,它是订阅者和订阅的唯一标识符。如果“U1”订阅了 3 个主题,则每个主题的 DurableID 应该不同.在我的例子中,我的DurableID是这样创建的:“Topic1->U1”。用户2将有“Topic1->U2”用于相同的主题,而“Topic2->U2”用于主题#2。我的代码有什么问题?
    • 恐怕我无法调试您的代码,最好的办法是使用代理工具(例如 Web 控制台或 JMX)来验证您在取消订阅时是否没有活动订阅者跨度>
    【解决方案2】:

    每个连接都需要唯一的 ClientIDconnection.setClientID("clientID");

    我的错误是误解了给定客户的这种独特性。

    当客户端订阅一个 Topic 时,该 Topic 有一个连接。因此,对于订阅 3 个主题的给定客户端(例如),需要 3 个 ClientID,因为需要 3 个连接。 ClientID 必须是唯一的,因为它为一个主题标识了一个客户端的一个连接。

    这就是为什么当我想结束订阅时,有这么多JMSException 告诉我“正在使用耐用消费者”。

    【讨论】:

      猜你喜欢
      • 2015-08-24
      • 2017-02-20
      • 2014-10-12
      • 2016-01-14
      • 2017-11-08
      • 2017-10-31
      • 2015-06-16
      • 2015-08-11
      • 2012-05-30
      相关资源
      最近更新 更多