【问题标题】:How to remove consumed messages from activemq?如何从activemq中删除已消费的消息?
【发布时间】:2019-12-27 08:19:36
【问题描述】:

我需要在消费后立即从activemq中删除消息。例如,我向队列发送一条消息,然后使用它,我需要将它从队列中删除。我在这里使用了一个消息存储和一个 clear() 方法。消息被添加到队列中,并且仅从消息存储中删除,而不是从队列中删除。我需要一种从队列中删除消息的方法。感谢您的帮助!

我试过下面的代码。

制作人

@Component
public class JmsProducer {

    @Autowired
    JmsTemplate jmsTemplate;
    
    @Value("${gkz.activemq.queue}")
    String queue;
    
    public void send(Customer customer){
        jmsTemplate.convertAndSend(queue, customer);
    }
}

消费者

@Component
public class JmsConsumer {
    @Autowired
    private MessageStorage customerStorage;

    @JmsListener(destination = "${gkz.activemq.queue}",containerFactory="jsaFactory")
    public void receive(Customer customer){
        System.out.println("Recieved Message: " + customer);
        customerStorage.add(customer);
    }
}

控制器

@PostMapping(value="/api/customer")
public Customer postCustomer(@RequestBody Customer customer){
    jmsProducer.send(customer);
    return customer;
}

@GetMapping(value="/api/customers")
public List<Customer> getAll(){
    List<Customer> customers = customerStorage.getAll();
    return customers;
}

@DeleteMapping(value="/api/customers/clear")
public String clearCustomerStorage() {
    customerStorage.clear();
    return "Clear All CustomerStorage!";
}

消息存储

public class MessageStorage {
    private List<Customer> customers = new ArrayList<>();

    public void add(Customer customer) {
        customers.add(customer);
    }

    public void clear() {
        customers.clear();
    }

    public List<Customer> getAll(){
        return customers;
    }
}

连接工厂配置

@Configuration
public class ConnectionFactoryConfiguration {
    
    @Value("${gkz.activemq.broker.url}")
    String brokerUrl;
    
    @Value("${gkz.activemq.borker.username}")
    String userName;
    
    @Value("${gkz.activemq.borker.password}")
    String password;
    
    @Bean
    public ConnectionFactory connectionFactory(){
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        connectionFactory.setBrokerURL(brokerUrl);
        connectionFactory.setUserName(userName);
        connectionFactory.setPassword(password);
        return connectionFactory;
    }
    
    @Bean
    public MessageConverter jacksonJmsMessageConverter() {
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        converter.setTargetType(MessageType.TEXT);
        converter.setTypeIdPropertyName("_type");
        return converter;
    }
    
    
    //Used for Receiving Message
    @Bean
    public JmsListenerContainerFactory<?> jsaFactory(ConnectionFactory connectionFactory,
                                                    DefaultJmsListenerContainerFactoryConfigurer configurer) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setMessageConverter(jacksonJmsMessageConverter());
        configurer.configure(factory, connectionFactory);
        return factory;
    }
 
    
    //Used for Sending Messages.
    @Bean
    public JmsTemplate jmsTemplate(){
        JmsTemplate template = new JmsTemplate();
        template.setMessageConverter(jacksonJmsMessageConverter());
        template.setConnectionFactory(connectionFactory());
        return template;
    }
    
}

整个代码在https://grokonez.com/java-integration/distributed-system/activemq-producer-consumer-springboot-restapis-example中可用

【问题讨论】:

  • 这是队列的目的。你真的看到你的消息在消费后没有被删除吗?
  • 您在@JmsListener 中定义的jsaFactory 怎么样?你在那里覆盖了什么样的配置?你可以在你的问题中添加它吗?
  • 你可以不使用任何自定义 containerFactory 吗?你也有@EnableJms 注释吗?你确定你在消费消息吗?你看到那些System.out.println("Recieved Message: " + customer);了吗?
  • 你坚持你的消息吗?默认情况下,我认为消息在消费时会被直接删除。指标只有一个 dequeued 计数器
  • 您显示的只是指标!它用于监视队列并查看消息是否被正确使用。消息在队列中消费后会被删除,这就是队列的重点,你不需要做任何事情。

标签: java spring-boot activemq


【解决方案1】:

单击 MQ 上的清除选项稍后重新启动 MQ 服务。这将清除所有未处于待处理状态的消息。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2021-04-02
    • 2020-01-17
    • 1970-01-01
    • 1970-01-01
    • 2015-11-24
    • 2020-12-01
    • 2015-07-16
    • 2015-11-25
    相关资源
    最近更新 更多