【发布时间】:2019-12-08 20:29:37
【问题描述】:
我需要创建一个 RabbitMQ 消息传递系统,将消息发送给特定用户,其中一条消息总是针对一个用户。
每个用户都应该有自己的动态创建的消息队列和 DLQ。
当用户拒绝一条消息时,它应该被移动到他的 DLQ,在那里它将等待 10 秒,然后返回到常规队列。
每个部分都在工作,除了消息在接受或拒绝时不会从常规队列中删除。
配置:
spring.rabbitmq.listener.type=simple
spring.rabbitmq.listener.simple.acknowledge-mode=auto
spring.rabbitmq.listener.simple.default-requeue-rejected=true
spring.rabbitmq.listener.simple.retry.enabled=false
消息服务:
@Service
@Scope(proxyMode = ScopedProxyMode.TARGET_CLASS)
public class RabbitService {
public static final String EXCHANGE_NAME = "my-exchange";
public static final String QUEUE_NAME_PREFIX = "my-queue.";
private final RabbitTemplate rabbitTemplate;
private final AmqpAdmin amqpAdmin;
@Autowired
public RabbitService(
RabbitTemplate rabbitTemplate,
AmqpAdmin amqpAdmin
) {
this.rabbitTemplate = rabbitTemplate;
this.amqpAdmin = amqpAdmin;
}
/**
* Initializes exchange with name {@link EXCHANGE_NAME} if it does not exist.
*/
@PostConstruct
public void init() {
Exchange exchange = ExchangeBuilder.topicExchange(EXCHANGE_NAME).build();
amqpAdmin.declareExchange(exchange);
}
/**
* Sends the {@param message} to user with id {@param userId}.
*/
public void send(@NotNull String message, long userId) {
String queueName = QUEUE_NAME_PREFIX + userId;
declareQueuesIfNecessary(queueName);
rabbitTemplate.convertAndSend(EXCHANGE_NAME, queueName, message);
}
/**
* Declares a new {@link Queue} with the specified {@param queueId} and a DLQ if they do not exist.
*
* @param queueId the queue identifier
*/
private void declareQueuesIfNecessary(@NotNull String queueName) {
String resendQueueName = "resend." + queueName;
Map<String, Object> args;
args = new HashMap<>();
// Args for the resend queue. It should have a TTL of 10 seconds, after which it will be moved to the regular queue.
args.put("x-message-ttl", 10000L);
args.put("x-dead-letter-exchange", EXCHANGE_NAME);
args.put("x-dead-letter-routing-key", queueName);
amqpAdmin.declareQueue(new Queue(resendQueueName, true, false, true, args));
amqpAdmin.declareBinding(new Binding(
resendQueueName,
Binding.DestinationType.QUEUE,
EXCHANGE_NAME,
resendQueueName,
null
));
// Args for the regular queue. When messages are rejected, they should move to the resend queue.
args = new HashMap<>();
args.put("x-dead-letter-exchange", EXCHANGE_NAME);
args.put("x-dead-letter-routing-key", resendQueueName);
amqpAdmin.declareQueue(new Queue(queueName, true, false, true, args));
amqpAdmin.declareBinding(new Binding(
queueName,
Binding.DestinationType.QUEUE,
EXCHANGE_NAME,
queueName,
null
));
}
}
消费者:
@Component
public class SimpleRabbitConsumer {
/**
* Consumes messages for all users and prints them if they are not blank, otherwise throws an exception.
*@implNote - in this example we don't need the user id
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue,
exchange = @Exchange(value = RabbitService.EXCHANGE_NAME, type = "topic"),
key = RabbitService.QUEUE_NAME_PREFIX + "*"
))
public void consume(@NotNull String message) {
// Let's reject the message it it's blank
if (StringUtils.isBlank(message)) {
throw new RuntimeException("Message rejected");
}
System.out.println("Message '" + message + "' received.");
}
}
什么不起作用
当消费者抛出RuntimeException时,消息不会被拒绝,而是会留在常规队列中。
我正在使用 RabbitMQ 管理器插件来查看有关队列的信息。
即使正确使用,消息也会保留在那里。
什么有效
当RabbitService#send(...) 方法被调用时,它正确地为指定的用户创建了常规队列和死信队列,但他还没有这些队列。
然后,SimpleRabbitConsumer#consume(...) 会正确接收该消息,然后将其打印出来或引发异常。
如果使用 RabbitMQ 管理器确认消息,则它会正确地从队列中删除。
当消息被拒绝(没有重新排队)时,它会被正确地发送到 DLQ,它会在 DLQ 中停留 10 秒,然后再被移回。
我的尝试
我已将 RuntimeException 更改为 AmqpRejectAndDontRequeueException,但对于 spring.rabbitmq.listener.simple.acknowledge-mode auto 和 manual,它的行为仍然相同。
在使用手动确认时,我还尝试将消费者方法更改为:
public void consume(
@NotNull String message,
@NotNull Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag
) throws IOException {
// Let's reject the message it it's blank
if (StringUtils.isBlank(message)) {
channel.basicReject(tag, false);
} else {
System.out.println("Message '" + message + "' received.");
channel.basicAck(tag, false);
}
}
无济于事。
其他问题
当消费者关闭时队列中有未处理的项目,那么当消费者重新启动时,消息将不会发送给消费者。
我知道解释很长,但是我花了很长时间浏览互联网寻找解决方案没有找到任何解决方案,所以我来到了这里。
我非常欢迎任何有关如何使其发挥作用的建议。
谢谢。
编辑
在调试时,我发现当应用程序启动时,会自动创建一个队列 Q 用于绑定“my-queue.*”的交换。
还创建了一个通道,它连接到消费者和队列 Q,而用户队列没有消费者。
我发送的任何消息似乎都发送到队列 Q 和用户队列,但它只从队列 Q 中删除。
日志如下:
2019-07-31 13:51:54.411 WARN 130172 --- [cTaskExecutor-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener method 'public void com.myproject.SimpleRabbitConsumer.consume(java.lang.String)' threw exception
...
Caused by: java.lang.RuntimeException: Message rejected
2019-07-31 13:51:54.456 DEBUG 130172 --- [cTaskExecutor-1] o.s.a.r.listener.BlockingQueueConsumer : Rejecting messages (requeue=false)
2019-07-31 13:51:54.456 TRACE 130172 --- [cTaskExecutor-1] o.s.a.r.c.CachingConnectionFactory : AMQChannel(amqp://guest@127.0.0.1:5672/,1) channel.basicNack([1, true, false])
2019-07-31 13:51:54.457 TRACE 130172 --- [cTaskExecutor-1] o.s.a.r.l.SimpleMessageListenerContainer : Waiting for message from consumer.
2019-07-31 13:51:54.457 TRACE 130172 --- [cTaskExecutor-1] o.s.a.r.listener.BlockingQueueConsumer : Retrieving delivery for Consumer@7f5ea4e8: tags=.......
2019-07-31 13:51:55.460 TRACE 130172 --- [cTaskExecutor-1] o.s.a.r.l.SimpleMessageListenerContainer : Waiting for message from consumer.
2019-07-31 13:51:55.460 TRACE 130172 --- [cTaskExecutor-1] o.s.a.r.listener.BlockingQueueConsumer : Retrieving delivery for Consumer@7f5ea4e8: tags=.......
可能是当我向 id 为 1 的用户发送消息时,消息通过主题发送到绑定“my-queue.*”和“my-queue.1”,但消费者只处理“my-queue.*”而不是“my-queue.1”?
【问题讨论】:
-
@ArnaudClaudel 你提到的问题是关于消息没有被添加回队列,但我的问题是它们没有被删除。不过,我尝试使用指定的更改。
当使用acknowledgemode=AUTO和defaultRequeueRejected=false时,无论消费者是否抛出异常,都不会发生任何变化。 IE。消息仍然停留在队列中。AmqpRejectAndDontRequeueException也是如此。我将在日志中添加单独的评论。
标签: java spring-boot rabbitmq