【发布时间】:2018-10-17 17:30:44
【问题描述】:
我已从上一篇文章 (spring-rabbit JSON deserialization for ArrayList contents) 中修改了我的 RabbitMQ,现在使用带有 MessagePostProcessors 的 DirectMessageListener 来 GZip 和 GUnzip 消息负载。
但是,它似乎没有工作,因为断点没有被激活,还因为我的RabbitListeners 不再接收消息,而他们用SimpleMessageFactoryListenerContainer 接收消息。
此外,似乎仍在使用 SimpleMessageListenerContainer(?)。在旁注中,我正在自动装配DirectMessageListenerContainer,因此我可以动态设置我使用的队列。
弹簧兔:2.0.3.RELEASE。 spring-boot:2.0.1.RELEASE。
RabbitMQ 配置:
@Configuration
@EnableRabbit
public class MessagingConfiguration implements ShutdownListener {
@Autowired
private RabbitListenerEndpointRegistry registry;
@Autowired
private DirectMessageListenerContainer container;
@Bean
public DirectMessageListenerContainer messageListenerContainer(final ConnectionFactory connectionFactory) {
final DirectMessageListenerContainer listenerContainer = new DirectMessageListenerContainer();
listenerContainer.setConnectionFactory(connectionFactory);
listenerContainer.setMessageConverter(jsonConverter()); // i.e.@RabbitListener to use Jackson2JsonMessageConverter
listenerContainer.setAutoStartup(false);
// container.setErrorHandler(errorHandler);
final MessageListenerAdapter messageListener = new MessageListenerAdapter(new Object() {
@SuppressWarnings("unused")
public String handleMessage(final String message) {
return message.toUpperCase();
}
});
messageListener.setBeforeSendReplyPostProcessors(new GZipPostProcessor());
listenerContainer.setMessageListener(messageListener);
listenerContainer.setAfterReceivePostProcessors(new GUnzipPostProcessor());
return listenerContainer;
}
@EventListener(ApplicationDatabaseReadyEvent.class)
public void onApplicationDatabaseReadyEvent() {
log.info("Starting all RabbitMQ Listeners..."); //$NON-NLS-1$
for (final MessageListenerContainer listenerContainer : registry.getListenerContainers()) {
listenerContainer.start();
}
log.info("Register is running: {}", registry.isRunning()); //$NON-NLS-1$
log.info("Started all RabbitMQ Listeners."); //$NON-NLS-1$
}
@Bean
public List<Declarable> bindings() {
final List<Declarable> declarations = new ArrayList<>();
final FanoutExchange exchange = new FanoutExchange("fx", true, false);
final Queue queue = QueueBuilder.durable("orders").build();
declarations.add(exchange);
declarations.add(queue);
declarations.add(BindingBuilder.bind(queue).to(exchange));
List<String> q = new ArrayList<>();
q.add(queue.getName());
container.addQueueNames(q.toArray(new String[queues.size()]));
return declarations;
}
@Bean
public Jackson2JsonMessageConverter jsonConverter() {
final Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
converter.setClassMapper(classMapper());
return converter;
}
private static DefaultJackson2JavaTypeMapper classMapper() {
final DefaultJackson2JavaTypeMapper classMapper = new DefaultJackson2JavaTypeMapper();
classMapper.setTrustedPackages("*"); //$NON-NLS-1$ //TODO add trusted packages
return classMapper;
}
@ConditionalOnProperty(name = "consumer", havingValue = "true")
@Bean
public ConsumerListener listenerConsumer() {
return new ConsumerListener();
}
@ConditionalOnProperty(name = "producer", havingValue = "true")
@Bean
public ProducerListener listenerProducer() {
return new ProducerListener();
}
@Bean
public RabbitAdmin rabbitAdmin(final CachingConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
@Bean
public RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(jsonConverter()); // convert all sent messages to JSON
rabbitTemplate.setReplyTimeout(TimeUnit.SECONDS.toMillis(3));
rabbitTemplate.setReceiveTimeout(TimeUnit.SECONDS.toMillis(3));
return rabbitTemplate;
}
@Override
public void shutdownCompleted(final ShutdownSignalException arg0) {
}
}
【问题讨论】:
标签: java rabbitmq spring-rabbit