【问题标题】:Resend messages after timeout超时后重新发送消息
【发布时间】:2019-12-06 09:53:08
【问题描述】:

我有一个放入 Spring AMQP 的对象列表。对象来自控制器。有一个服务可以处理这些对象。此服务可能会因 OutOfMemoryException 而崩溃。因此,我运行了该应用程序的多个实例。

有一个问题:当服务崩溃时,我丢失了收到的消息。我读到了 NACK。并且可以在出现异常或 RuntimeException 的情况下使用它。但是我的服务在错误中崩溃。因此,我无法发送 NACK。是否可以在 AMQP 中设置超时,如果我没有确认之前到达的消息,我会再次收到消息?

这是我写的代码:

public class Exchanges {
    public static final String EXC_RENDER_NAME = "render.exchange.topic";
    public static final TopicExchange EXC_RENDER = new TopicExchange(EXC_RENDER_NAME, true, false);
}

public class Queues {
    public static final String RENDER_NAME = "render.queue.topic";
    public static final Queue RENDER = new Queue(RENDER_NAME);
}

@RequiredArgsConstructor
@Service
public class RenderRabbitEventListener extends RabbitEventListener {
    private final ApplicationEventPublisher eventPublisher;

    @RabbitListener(bindings = @QueueBinding(value = @Queue(Queues.RENDER_NAME),
                                             exchange = @Exchange(value = Exchanges.EXC_RENDER_NAME, type = "topic"),
                                             key = "render.#")
    )
    public void onMessage(Message message, Channel channel) {
        String routingKey = parseRoutingKey(message);
        log.debug(String.format("Event %s", routingKey));
        RenderQueueObject queueObject = parseRender(message, RenderQueueObject.class);
        handleMessage(queueObject);
    }
    public void handleMessage(RenderQueueObject render) {
        GenericSpringEvent<RenderQueueObject> springEvent = new GenericSpringEvent<>(render);
        springEvent.setRender(true);
        eventPublisher.publishEvent(springEvent);
    }
}

这是发送消息的方法:

    @Async ("threadPoolTaskExecutor")
    @EventListener (condition = "# event.queue")
    public void start (GenericSpringEvent <RenderQueueObject> event) {
        RenderQueueObject renderQueueObject = event.getWhat ();
        send (RENDER_NAME, renderQueueObject);
}
private void send(String routingKey, Object queue) {
    try {
        rabbitTemplate.convertAndSend(routingKey, objectMapper.writeValueAsString(queue));
    } catch (JsonProcessingException e) {
        log.warn("Can't send event!", e);
    }
}

【问题讨论】:

    标签: java rabbitmq jms spring-amqp


    【解决方案1】:

    您需要关闭连接才能让消息重新排队。

    最好在 OOME 之后终止应用程序(当然,这会关闭连接)。

    【讨论】:

    • 加里,是的,它有效。一旦连接关闭,消息就会被复制到一个空闲实例。但如果我异步运行,它就会停止工作。
    • 但是,正如我所说,OOME 应该被认为是致命的,你应该停止应用程序 - 你需要弄清楚为什么你会得到 OOME。
    猜你喜欢
    • 2012-10-26
    • 2019-05-31
    • 1970-01-01
    • 2016-10-20
    • 2013-12-07
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多