【发布时间】:2020-11-17 12:01:31
【问题描述】:
我正在尝试使用 RabbitMQ 和 Spring Boot 了解消息何时被接受 (ack) 或不被接受 (nack)。
我想将消息发送到队列(通过交换)并检查队列是否已接受消息。实际上我想发送到两个不同的队列,但这并不重要,我假设它是否适用于其中一个也适用于另一个。
所以我尝试过使用CorrelationData:
public boolean sendMessage(...) {
CorrelationData cd = new CorrelationData();
this.rabbitTemplate.convertAndSend(exchange, routingKey, message, cd);
try {
return cd.getFuture().get(3, TimeUnit.SECONDS).isAck();
} catch (InterruptedException | ExecutionException | TimeoutException e ) {
e.printStackTrace();
return false;
}
}
cd.getFuture().get(3, TimeUnit.SECONDS).isAck() 行应该得到false 的值,我认为该值还没有被ack 放入队列。但这始终是正确的,即使 routingKey 不存在。
所以我假设这段代码正在检查消息是否已发送到exchange 和exchange 说“是的,我已经收到了消息,它还没有被路由,但我已经收到了它”。
所以,我已经在 Rabbit/Spring 文档中寻找其他方法,但我找不到。
再解释一下,我想要的是:
进入 Spring Boot 代码,我收到一条消息。此消息必须发送到其他队列/交换,但在其他两个队列确认ack 之前,不能从当前队列中删除(即acked)。
我有手动确认,作为一个小伪代码,我有这个:
@RabbitListener(queues = {queue})
public void receiveMessageFromDirect(Message message, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag){
boolean sendQueue1 = sendMessage(...);
boolean sendQueue2 = sendMessage(...);
if(sendQueue1 && sendQueue2){
//both messages has been readed; now I can ack this message
channel.basicAck(tag, false);
}else{
//nacked; I can't remove the message util both queue ack the message
channel.basicNack(tag,false,true);
}
我已经测试过这个结构,即使队列不存在,sendQueue1 和 sendQueue2 的值也始终为真。
【问题讨论】:
标签: java spring-boot rabbitmq spring-rabbit