【问题标题】:How to implement java production grade RabbitMQ consumer如何实现java生产级RabbitMQ消费者
【发布时间】:2014-08-13 07:44:26
【问题描述】:

我遇到了 RabbitMQ 工作队列实现的问题。我目前在 Tomcat 中运行它,并且我有以下类不断地监听队列中的新任务。但是一两天后,它突然表现得很奇怪,对象 DeliveryOK 返回 by channel.queueDeclare(taskQueueName, isDurable, false, false, null); 总是零。 (我在下面提到“当前 poolSize”的日志中打印出来)。

但在 Rabbit 管理员(./rabbitmqadmin 列表队列或 RabbitMq 管理员门户)中,它总是返回一个大于零的数字(例如队列中的 1267 条消息)。它不会减少到零直到我重新启动tomcat,下面的类只能检测到队列中实际上有一些消息。

最初我认为这个类以某种方式终止,但它能够使用那些新到达的消息。它不会消耗那些挂在队列中的 1267 条消息。例如队列中的消息 1267,在我重新启动 tomcat 之前不会被消费。

从下面的代码来看,是因为实现有问题还是有更好的方法来专门为 RabbitMQ 实现队列消费者?我已经阅读了相关的堆栈帖子(Producer/Consumer threads using a Queue),但我不确定它是否有帮助。

另外,下面的这个消费者实现真的不会在 RunTimeException 中存活吗?

MqConsumer 类:

@Service
public class MqConsumer implements Runnable{

private static final Logger logger = LoggerFactory.getLogger(MqConsumer.class);
private final int MAX_ALERT_THRESHOLD = 10000;

@Autowired
private AsynchSystemConnections asynchSystemConnections;
public MqConsumer(){

}

@PostConstruct
private void init() {
    (new Thread(new MqConsumer(asynchSystemConnections))).start();
}

public MqConsumer(AsynchSystemConnections asynchSystemConnections){
    this.asynchSystemConnections = asynchSystemConnections;
}

@Override
public void run() {
    logger.info("Execute Consumer instance...");

    while (true) { // infinite loop until it die due server restart
        boolean toSleep = consume(asynchSystemConnections);

        if (toSleep){
            logger.error("Sleeping for 1 second...");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                logger.error("", e);
            }
        }
    }
}
private boolean consume(AsynchSystemConnections asynchSystemConnections) {
    com.rabbitmq.client.Connection mqConnection = null;
    Channel mqChannel = null;
    DatasiftMq dMq = null;

    try {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(asynchSystemConnections.getMqServerHost());

        mqConnection = factory.newConnection();
        mqChannel = mqConnection.createChannel();
        //consumePushInteractionJob method will forward to AsynchTwService.consume(connection, channel, AsynchTwService.PUSH_INTERACTION_QUEUE )
        dMq = asynchSystemConnections.getAsynchService().consumePushInteractionJob(mqConnection, mqChannel);


        int poolSize = asynchSystemConnections.getAsynchService().getPushInteractionQueueSize();
        logger.info("Current poolSize: " + poolSize);

    } catch(NullPointerException e) {
        logger.error("", e);
        if (dMq != null) {

            try {
                logger.error("Removing JSON with" + dMq.getLogHeader(dMq));
                asynchSystemConnections.getAsynchService().ack(mqChannel, dMq.getDelivery());
                logger.error("Removed JSON with" + dMq.getLogHeader(dMq));
            } catch (IOException e1) {
                logger.error("Remove JSON Failed: ", e);
            }
        }
        return true;
    } catch (IOException e) {
        logger.error("Unable to create new MQ Connection from factory.", e);
        return true;
    } catch (InterruptedException e) {
        logger.error("", e);
        return true;
    } catch (ClassNotFoundException e) {
        logger.error("", e);
        return true;
    }  catch (Exception e) {
        logger.error("Big problem, better solve this fast!!", e);
        asynchSystemConnections.getNotificationService().notifySystemException(null, e);
        return true;    
    } finally {

        try {
            asynchSystemConnections.getAsynchService().ack(mqChannel, dMq.getDelivery());
            asynchSystemConnections.getAsynchService().disconnect(mqConnection, mqChannel);
        } catch (IOException e) {
            logger.error("", e);
        }
    }

    return false;
}

AsynchTwService 类:

@Service("asynchTwService")
public class AsynchTwService implements AsynchService {
static final String FAVOURITE_COUNT_QUEUE = "favourite_count_queue";
static final String FRIENDS_FOLLOWERS_QUEUE = "friends_followers_queue";
static final String DIRECT_MESSAGE_RECEIVE_QUEUE = "direct_message_receive_queue";
static final String PUSH_INTERACTION_QUEUE = "push_interaction_queue";

private static String mqServerHost;

private static final Logger logger = LoggerFactory.getLogger(AsynchTwService.class);
private static final boolean isDurable = true;
private boolean autoAck = false;

private ConcurrentHashMap<String, Integer> currentQueueSize = new ConcurrentHashMap<String, Integer>();

@Override
public Connection getConnection() throws IOException{
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost(mqServerHost); 

    return factory.newConnection();
}

@Override
public void produce(Connection connection, Channel channel, Object object, String taskQueueName) throws IOException {
    sendToQueue(connection, channel, object, taskQueueName);
}

@Override
public QueueItem consume(Connection connection, Channel channel, String taskQueueName) throws IOException, InterruptedException, ClassNotFoundException{
    Serializer serializer = new Serializer();
    try {
        Delivery delivery = listenFromQueue(connection, channel, taskQueueName);
        Object messageObj = serializer.toObject(delivery.getBody());
        QueueItem queueItem = (QueueItem)messageObj;
        queueItem.setDelivery(delivery);
        return queueItem;
    } catch (InterruptedException e) {
        throw e;
    } catch (ClassNotFoundException e) {
        logger.error("Unable to serialize the message to QueueItem object", e);
        throw e;
    }
}

@Override
public int getQueueSize(String taskQueueName){
    return this.currentQueueSize.get(taskQueueName); 
}

private Delivery listenFromQueue(Connection connection, Channel channel, String taskQueueName) throws IOException, InterruptedException, ClassNotFoundException{
    try {
        DeclareOk  ok = channel.queueDeclare(taskQueueName, isDurable, false, false, null);
        currentQueueSize.put(taskQueueName, ok.getMessageCount());
        logger.info("Queue ("+ taskQueueName + ") has items: " +ok.getMessageCount());
    } catch (IOException e) {
        // TODO Auto-generated catch block
    }

    logger.info(" [*] Consuming "+taskQueueName+" message...");

    QueueingConsumer consumer = new QueueingConsumer(channel);
    try {
        channel.basicConsume(taskQueueName, autoAck, consumer);
    } catch (IOException e) {
        logger.error("", e);
    }

    try {
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        return delivery;
    } catch (ShutdownSignalException e) {
        logger.error("Unable to retrieve message from Queue", e);
        throw e;
    } catch (ConsumerCancelledException e) {
        logger.error("Unable to retrieve message from Queue", e);
        throw e;
    } catch (InterruptedException e) {
        logger.error("Unable to retrieve message from Queue", e);
        throw e;
    } 
}



private void sendToQueue(Connection connection, Channel channel, Object object, String taskQueueName) throws IOException{
    //Initialization, create Message Queue broker connection
    try{
        channel.queueDeclare(taskQueueName, isDurable, false, false, null);
    }catch(IOException e) {
        logger.error(e.getMessage());
        logger.error("Error create Message Queue connection for queue name:" + taskQueueName, e);
        throw e;
    }

    //send message to broker
    try {
        long start = System.currentTimeMillis();
        Serializer serializer = new Serializer();
        logger.info("Sending Twitter QueueItem to Message Queue...");

        channel.basicPublish("", taskQueueName, MessageProperties.PERSISTENT_TEXT_PLAIN, 
                serializer.toBytes(object)); 

        logger.info("Queue successfully sent, process took: " + (System.currentTimeMillis()-start)+ "ms");
    } catch (IOException e) {
        logger.error("Error while sending object to queue : " + taskQueueName, e);
        throw e;
    }
}

public static String getMqServerHost() {
    return mqServerHost;
}

public static void setMqServerHost(String mqServerHost) {
    AsynchTwService.mqServerHost = mqServerHost;
}

@Override
public void disconnect(Connection connection, Channel channel) throws IOException{
    try {
        if (channel != null){
            if (channel.isOpen()){
                channel.close();    
            }
        }
        if (connection != null){
            if (connection.isOpen()){
                connection.close(); 
            }
        }
        logger.debug("MQ Channel Disconnected");
    } catch (IOException e) {
        throw e;
    }
}

@Override
public void ack(Channel channel, QueueingConsumer.Delivery delivery) throws IOException {
    // this is made as another method call is to avoid Ack too fast un intentionally
    try {
        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        logger.info("[x] acked" );
    } catch (IOException e) {
        logger.error("Unable Acknowledge Queue Message", e);
        throw e;
    }
}

@Override
public DatasiftMq consumeDatasiftInteraction(Connection connection, Channel channel, 
        String taskQueueName) throws IOException, InterruptedException, ClassNotFoundException {

    Serializer serializer = new Serializer();
    try {
        Delivery delivery = listenFromQueue(connection, channel, taskQueueName);
        Object messageObj = serializer.toObject(delivery.getBody());
        DatasiftMq dto = (DatasiftMq)messageObj;
        dto.setDelivery(delivery);
        return dto;
    } catch (InterruptedException e) {
        throw e;
    } catch (ClassNotFoundException e) {
        logger.error("Unable to serialize the message to DatasiftDTO object", e);
        throw e;
    }
}

@Override
public void reQueue(Channel channel, Delivery delivery) throws IOException {
    try {
        channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
        logger.info("[x] Nacked" );
    } catch (IOException e) {
        logger.error("Unable Acknowledge Queue Message", e);
        throw e;
    }       
}

}

【问题讨论】:

  • 不使用spring-amqp依赖有什么原因吗?
  • 您在代码中使用 basicQos 吗?有多少消费者连接到该队列?
  • @Zarathustra 我一开始没有使用 spring-amqp。它解决了我上面提到的问题吗?
  • @old_sound 不,我没有指定任何 QOS。基本上它是一个 FIFO,只有一个消费者连接到该队列。但是您确实提出了一个很好的问题,因为即使只有 1 个这样的对象,但有时当我在 ./rabbitmqadmin list queues 中查看它时,我可以看到在同一个队列名称下列出的多个消费者。这可能是潜在的根本原因吗?
  • @Reusable 至少它让事情变得更容易,例如 Spring 默认自动发送确认,它也为多个消费者提供了一个很好的架构。你能告诉我们你对AsynchSystemConnections的实现吗?

标签: java multithreading tomcat rabbitmq


【解决方案1】:

您似乎在这里缺少一些基础知识。

取自here 和我的一些代码。 在消费者线程之外建立连接:

//executed once
ConnectionFactory factory = new ConnectionFactory();

factory.setHost("someHost");
factory.setUsername("user");
factory.setPassword("pass");

Connection connection = factory.newConnection();

你必须在你的线程中做什么:

//Consumer - executed in a Thread
QueueingConsumer consumer = new QueueingConsumer(connection.createChannel());
boolean autoAck = false;
channel.basicConsume("hello", autoAck, consumer);

while (!Thread.current().isInterrupted())) {
  QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  //...      
  channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}

总的来说,我仍然建议您查看它完美集成的 spring-amqp 库。

【讨论】:

  • 是的!你已经明确了你的观点,这就是问题所在!该代码有效,并且 tomcat restart 确实设法等待该线程被优雅地杀死。同样在 spring-amqp 上,我设法查看它,你知道吗?我现在正在生产中使用弹簧兔!我什至不需要关心管理创建了多少连接等等。感谢您的推荐!
  • 只是为了让您知道:spring-rabbitmq 依赖项带来了一个较旧的rabbitmq 客户端依赖项,它不具备所有功能。如果您关心一些新功能,例如可恢复的连接,您应该看看它:rabbitmq.com/api-guide.html#recovery 在某个地方也有一个客户端的变更日志站点......现在找不到......
  • 呃哦..你是说spring-rabbit没有启用自动恢复?我以前没有看到这种情况发生,因为我遵循 rabbitmq.com api-guide 上的代码。如果您发现任何关于它的信息,请告诉我。注意:到目前为止,prod 环境非常好。
  • 我的设置是在一个带有 ha 队列的集群中的 3 个 rabbitmq 节点,没有什么特别的(没有 FREAKING CLUSTERIP)所有客户端都知道所有 3 个 rabbitmq ip,如果一个无法访问,他们会使用另一个(保持简单) .不确定,但我认为自动恢复的默认值为 false。还要检查ConnectionFactory 中的超时。 Sicne 3.x 在连接管理方面有了很大的改进。 Currenty spring-rabbit 当前使用的是 3.3.1,但 amqp-client 的最新版本是 3.3.4。
猜你喜欢
  • 1970-01-01
  • 2013-03-25
  • 1970-01-01
  • 2018-08-26
  • 2011-05-07
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多