【发布时间】:2018-04-10 15:34:30
【问题描述】:
我是新手 RabbitMQ java 客户端。 我的问题:我创建了 10 个消费者并将它们添加到队列中。每个消费者使用 10 秒来处理我的流程。我检查了兔子的页面,我看到我的队列有 4000 条消息没有发送给客户端。我检查了日志客户端,结果是为一位消费者获取了一条消息,10 秒后我为一位消费者获取了一条消息,依此类推..我想同时为所有消费者获取 10 条消息(10 条消息-当时 10 条消费者进程) 请帮助我,我没有找到问题的解决方案。 非常感谢。
while (!isRetry) {
try {
isRetry = true;
connection = mConnectionFactory.newConnection(addresses.toArray(new Address[addresses.size()]));
String queueName = "webhook_customer";
String exchangeName = "webhook_exchange";
String routingKey = "customer";
System.out.println("step2");
Channel channel = connection.createChannel();
channel.exchangeDeclare(exchangeName, "topic", true);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
channel.basicQos(1);
for (int i = 0; i < numberWorker; i++) {
Consumer consumer = new QueueingConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
long startProcess = System.nanoTime();
JSONObject profile = null;
try {
} catch (IOException ioe) {
handleLogError(profile, ioe.getMessage().toString());
} catch (Exception e) {
handleLogError(profile, e.getMessage());
} finally {
channel.basicAck(envelope.getDeliveryTag(), false);
long endProcess = System.nanoTime();
_logger.info("===========######### TIME PROCESS + " + (endProcess - startProcess) + " Nano Seconds ========#### " + (endProcess - startProcess) / 1000000 + " Milli Seconds");
}
}
};
channel.basicConsume(queueName, false, consumer);
}
System.out.printf("Start Listening message ...");
} catch (Exception e) {
System.out.println("exception " + e.getMessage());
isRetry = closeConnection(connection);
e.printStackTrace();
} finally {
}
if (!isRetry) {
try {
System.out.println("sleep waiting retry ...");
Thread.sleep(30000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//END
}
【问题讨论】:
-
您能否发布一个代码示例,说明您如何声明您的消费者。你是用
channel.basicConsume还是其他方式? -
我在帖子中更新了我的代码,请检查并帮助我。非常感谢