【问题标题】:Processing message from rabbitmq at specified rate以指定速率处理来自 rabbitmq 的消息
【发布时间】:2016-11-06 07:26:23
【问题描述】:

我们一直在尝试让监听器以 1 msg/2 秒的特定速率从 rabbitmq 读取消息。到目前为止,我们还没有发现 rabbit mq 的任何此类实用程序。因此考虑使用 DB 执行此操作,即侦听器将读取消息并将其存储到 DB 中,稍后调度程序将以所需的速率从 DB 处理。如果有任何更好的方法可以做到这一点,请提出建议。我们正在 Spring 中开发我们的应用程序。提前致谢。

【问题讨论】:

    标签: rabbitmq spring-amqp throttling spring-rabbit


    【解决方案1】:

    你不能用监听器来做,但你可以用RabbitTemplate来做......

    @SpringBootApplication
    public class So40446967Application {
    
        public static void main(String[] args) throws Exception {
            ConfigurableApplicationContext context = SpringApplication.run(So40446967Application.class, args);
            RabbitAdmin admin = context.getBean(RabbitAdmin.class);
            AnonymousQueue queue = new AnonymousQueue();
            admin.declareQueue(queue);
            RabbitTemplate template = context.getBean(RabbitTemplate.class);
            for (int i = 0; i < 10; i++) {
                template.convertAndSend(queue.getName(), "foo" + i);
            }
            String out = (String) template.receiveAndConvert(queue.getName());
            while (out != null) {
                System.out.println(new Date() + " " + out);
                Thread.sleep(2000);
                out = (String) template.receiveAndConvert(queue.getName());
            }
            context.close();
        }
    
    }
    

    当然,您可以使用更复杂的东西,例如任务调度程序或 Spring @Async 方法,而不是休眠。

    【讨论】:

      【解决方案2】:

      灵感来自 Gary Russel 的回答:

      您可以使用更复杂的东西,例如任务调度程序或 Spring @Async

      您还可以得到每分钟确定的消息数量,并模拟相同的限速:

      private final RabbitTemplate rabbitTemplate;
      
      @Scheduled(fixedDelay = 60000) // 1 minute
      public void read() {
      
          List<String> messages = new ArrayList<>();
          String message = getMessageFromQueue();
          while(message != null && messages.size() < 30) { // 30 messages in 1 minute = 1 msg / 2 seconds
              messages.add(message);
              message = getMessageFromQueue();
          }
      
          public String getMessageFromQueue() {
              return (String) rabbitTemplate.receiveAndConvert(QUEUE_NAME);
          }
      
      }
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2017-01-30
        相关资源
        最近更新 更多