【问题标题】:RabbitMQ slow receiving speedRabbitMQ接收速度慢
【发布时间】:2017-11-02 12:56:59
【问题描述】:

我需要创建应该从数据生产者接收数据并处理它的应用程序。我选择 RabbitMQ 作为消息代理。我的测试显示不是最好的结果:

已发送 - 100 条消息;
产生 - 100 msg/s;
消耗 - 6 msg/s;

为了解决它,我设置了listenerContainer.setAcknowledgeMode(AcknowledgeMode.NONE); 但我需要确认一些队列。而且我不能使用工作线程并行执行,因为消息的顺序对于数据处理很重要。

确认后是否可以提高接收速度?

制片人:

@Bean
Queue queue() {
    return new Queue(queueName, false);
}

@Bean
public ConnectionFactory connectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory("ip");
    connectionFactory.setUsername("name");
    connectionFactory.setPassword("pswd");
    return connectionFactory;
}

@Bean
public AmqpAdmin amqpAdmin() {
    return new RabbitAdmin(connectionFactory());
}

@Bean
public RabbitTemplate rabbitTemplate() {
    return new RabbitTemplate(connectionFactory());
}

@Bean
public FanoutExchange exchange() {
    return new FanoutExchange("exchange-1");
}

@Bean
public Binding binding(){
    return BindingBuilder.bind(queue()).to(exchange());
}

...

rabbitTemplate.setExchange("exchange-1");
rabbitTemplate.convertAndSend(data);

消费者:

@Bean
public ConnectionFactory connectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory("ip");
    connectionFactory.setUsername("name");
    connectionFactory.setPassword("pswd");
    return connectionFactory;
}

@Bean
public RabbitTemplate rabbitTemplate() {
    return new RabbitTemplate(connectionFactory());
}

@Bean
Queue queue() {
    return new Queue("queue-1", false);
}

@Bean
public SimpleMessageListenerContainer listenerContainer() {
    SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer();
    listenerContainer.setConnectionFactory(connectionFactory());
    listenerContainer.setQueues(queue());
    listenerContainer.setMessageListener(new Receiver());
    listenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
    return listenerContainer;
}

...

@Override
public void onMessage(Message message) {
    System.out.println("Received message: " + fromBytes(message.getBody()) + " \n Time = " + System.currentTimeMillis());
}

在具有 2 个 vCPU 和 4 Gb 内存的实例上测试。

【问题讨论】:

    标签: java rabbitmq message-queue spring-amqp messagebroker


    【解决方案1】:

    可以增加容器的prefetchCount,会大大提高性能。但是,如果您拒绝并重新排队消息,则会丢失排序(重新排队的消息将落后于预取的消息)。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2016-05-06
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2011-05-04
      • 2020-06-17
      • 2017-03-15
      相关资源
      最近更新 更多