【问题标题】:Compressing and decompressing Spring RabbitMQ messages with a DirectMessageListenerContainer使用 DirectMessageListenerContainer 压缩和解压缩 Spring RabbitMQ 消息
【发布时间】:2018-10-17 17:30:44
【问题描述】:

我已从上一篇文章 (spring-rabbit JSON deserialization for ArrayList contents) 中修改了我的 RabbitMQ,现在使用带有 MessagePostProcessors 的 DirectMessageListener 来 GZip 和 GUnzip 消息负载。

但是,它似乎没有工作,因为断点没有被激活,还因为我的RabbitListeners 不再接收消息,而他们用SimpleMessageFactoryListenerContainer 接收消息。 此外,似乎仍在使用 SimpleMessageListenerContainer(?)。在旁注中,我正在自动装配DirectMessageListenerContainer,因此我可以动态设置我使用的队列。


弹簧兔:2.0.3.RELEASE。 spring-boot:2.0.1.RELEASE。


RabbitMQ 配置:

@Configuration
@EnableRabbit
public class MessagingConfiguration implements ShutdownListener {

    @Autowired
    private RabbitListenerEndpointRegistry registry;

    @Autowired
    private DirectMessageListenerContainer container;

    @Bean
    public DirectMessageListenerContainer messageListenerContainer(final ConnectionFactory connectionFactory) {
        final DirectMessageListenerContainer listenerContainer = new DirectMessageListenerContainer();
        listenerContainer.setConnectionFactory(connectionFactory);
        listenerContainer.setMessageConverter(jsonConverter()); // i.e.@RabbitListener to use Jackson2JsonMessageConverter
        listenerContainer.setAutoStartup(false);
        // container.setErrorHandler(errorHandler);
        final MessageListenerAdapter messageListener = new MessageListenerAdapter(new Object() {
            @SuppressWarnings("unused")
            public String handleMessage(final String message) {
                return message.toUpperCase();
            }
        });
        messageListener.setBeforeSendReplyPostProcessors(new GZipPostProcessor());
        listenerContainer.setMessageListener(messageListener);
        listenerContainer.setAfterReceivePostProcessors(new GUnzipPostProcessor());
        return listenerContainer;
    }

    @EventListener(ApplicationDatabaseReadyEvent.class)
    public void onApplicationDatabaseReadyEvent() {
        log.info("Starting all RabbitMQ Listeners..."); //$NON-NLS-1$
        for (final MessageListenerContainer listenerContainer : registry.getListenerContainers()) {
            listenerContainer.start();
        }
        log.info("Register is running: {}", registry.isRunning()); //$NON-NLS-1$
        log.info("Started all RabbitMQ Listeners."); //$NON-NLS-1$
    }

    @Bean
    public List<Declarable> bindings() {
    final List<Declarable> declarations = new ArrayList<>();
        final FanoutExchange exchange = new FanoutExchange("fx", true, false);
        final Queue queue = QueueBuilder.durable("orders").build();
        declarations.add(exchange);
        declarations.add(queue);
        declarations.add(BindingBuilder.bind(queue).to(exchange));
        List<String> q = new ArrayList<>();
        q.add(queue.getName());
    container.addQueueNames(q.toArray(new String[queues.size()]));

        return declarations;
    }

    @Bean
    public Jackson2JsonMessageConverter jsonConverter() {
        final Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
        converter.setClassMapper(classMapper());
        return converter;
    }

    private static DefaultJackson2JavaTypeMapper classMapper() {
        final DefaultJackson2JavaTypeMapper classMapper = new DefaultJackson2JavaTypeMapper();
        classMapper.setTrustedPackages("*"); //$NON-NLS-1$  //TODO add trusted packages
        return classMapper;
    }

    @ConditionalOnProperty(name = "consumer", havingValue = "true")
    @Bean
    public ConsumerListener listenerConsumer() {
        return new ConsumerListener();
    }

    @ConditionalOnProperty(name = "producer", havingValue = "true")
    @Bean
    public ProducerListener listenerProducer() {
        return new ProducerListener();
    }

    @Bean
    public RabbitAdmin rabbitAdmin(final CachingConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }

    @Bean
    public RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) {
        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(jsonConverter()); // convert all sent messages to JSON
        rabbitTemplate.setReplyTimeout(TimeUnit.SECONDS.toMillis(3));
        rabbitTemplate.setReceiveTimeout(TimeUnit.SECONDS.toMillis(3));
        return rabbitTemplate;
    }

    @Override
    public void shutdownCompleted(final ShutdownSignalException arg0) {
    }
}

【问题讨论】:

    标签: java rabbitmq spring-rabbit


    【解决方案1】:

    那样不行,你不能为@RabbitListeners 自动装配容器;它们不是豆子;它们由容器工厂创建并在注册表中注册。相反,您必须从注册表中检索它们(通过 id)。

    但是,由于您已将 autoStartup 设置为 false,因此不应“窃取”来自您的 @RabbitListener 的消息。

    一般来说,调试日志应该会有所帮助。

    【讨论】:

    • 我刚刚添加了我丢失的配置,它在另一个事件时启动我的容器,这样:` for (final MessageListenerContainer listenerContainer : registry.getListenerContainers()) { listenerContainer.start(); }
    • 对,但这不会启动 messageListenerContainer @Bean - 那是一个 bean,而不是为 @RabbitListener 创建的。如果您希望将后处理器应用于带注释的侦听器,则必须将后处理器添加到侦听器容器工厂,而不是该容器 bean。看起来您可能会将 DirectMessageListenerContainerDirectRabbitListenerContainerFactory 混淆,后者用于为每个侦听器创建前者。
    • 已修复。我现在正在使用 DirectRabbitListenerContainerFactory 并在其上设置了我的后处理器。它们现在正在被使用。非常感谢加里。
    • Gary,我遇到了一个异常:Caused by: java.io.UnsupportedEncodingException: gzip:UTF-8 ` at java.lang.StringCoding.decode(StringCoding.java:190) ~[?:1.8.0_152]` ` at java.lang.String .(String.java:426) ~[?:1.8.0_152]` ` 在 java.lang.String.(String.java:491) ~[?:1.8.0_152]` ` 在 org .springframework.amqp.support.converter.Jackson2JsonMessageConverter.convertBytesToObject(Jackson2JsonMessageConverter.java:215)` 在 org.springframework.amqp.support.converter.Jackson2JsonMessageConverter.fromMessage(Jackson2JsonMessageConverter.java:179) `
    • 不要在 cmets 中放这样的东西;很难阅读。我建议您开始一个新问题,显示您当前的代码/配置。听起来好像未应用未压缩的 MPP。 Gzip MPP 在 contentEncoding 消息属性前加上 gzip: 前缀,而 Unzip MPP 去掉前缀。调用 JSON 转换器时前缀仍然存在的事实表明它没有解压缩。