【发布时间】:2017-11-02 12:56:59
【问题描述】:
我需要创建应该从数据生产者接收数据并处理它的应用程序。我选择 RabbitMQ 作为消息代理。我的测试显示不是最好的结果:
已发送 - 100 条消息;
产生 - 100 msg/s;
消耗 - 6 msg/s;
为了解决它,我设置了listenerContainer.setAcknowledgeMode(AcknowledgeMode.NONE);
但我需要确认一些队列。而且我不能使用工作线程并行执行,因为消息的顺序对于数据处理很重要。
确认后是否可以提高接收速度?
制片人:
@Bean
Queue queue() {
return new Queue(queueName, false);
}
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("ip");
connectionFactory.setUsername("name");
connectionFactory.setPassword("pswd");
return connectionFactory;
}
@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}
@Bean
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(connectionFactory());
}
@Bean
public FanoutExchange exchange() {
return new FanoutExchange("exchange-1");
}
@Bean
public Binding binding(){
return BindingBuilder.bind(queue()).to(exchange());
}
...
rabbitTemplate.setExchange("exchange-1");
rabbitTemplate.convertAndSend(data);
消费者:
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("ip");
connectionFactory.setUsername("name");
connectionFactory.setPassword("pswd");
return connectionFactory;
}
@Bean
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(connectionFactory());
}
@Bean
Queue queue() {
return new Queue("queue-1", false);
}
@Bean
public SimpleMessageListenerContainer listenerContainer() {
SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer();
listenerContainer.setConnectionFactory(connectionFactory());
listenerContainer.setQueues(queue());
listenerContainer.setMessageListener(new Receiver());
listenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
return listenerContainer;
}
...
@Override
public void onMessage(Message message) {
System.out.println("Received message: " + fromBytes(message.getBody()) + " \n Time = " + System.currentTimeMillis());
}
在具有 2 个 vCPU 和 4 Gb 内存的实例上测试。
【问题讨论】:
标签: java rabbitmq message-queue spring-amqp messagebroker