【问题标题】:Non de-batching messages in spring amqpspring amqp 中的非反批处理消息
【发布时间】:2017-11-07 18:25:38
【问题描述】:

我正在使用BatchingRabbitTemplate 将消息批量发送到 amqp 端点。现在,在另一个接收端,我可以使用@RabbitListener 接收消息,但我的问题是消息会自动取消批处理,所以我不能使用@RabbitHandler public void receive (List<SomeObject> so)。除了我这样做之外,有没有更简单的非分批处理消息的方法:

@RabbitListener(..., containerFactory = "nonDeBatchingContainerFactory")

@Bean
public RabbitListenerContainerFactory nonDeBatchingContainerFactory(){
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory());
    factory.setDeBatchingEnabled(false);
    factory.setMessageConverter(jackson2JsonMessageConverter());
    factory.setAfterReceivePostProcessors(new NonDeBatchingMessagePostProcessor(jackson2JsonMessageConverter()));
    return factory;
    }

然后实现这个后处理器(或多或少地复制现有代码以进行反批处理)。

public class NonDeBatchingMessagePostProcessor implements MessagePostProcessor {

    private MessageConverter payloadConverter;

    public NonDeBatchingMessagePostProcessor(MessageConverter payloadConverter) {
        this.payloadConverter = payloadConverter;
    }

    @Override
    public Message postProcessMessage(Message message) throws AmqpException {
        Object batchFormat = message.getMessageProperties().getHeaders().get(MessageProperties.SPRING_BATCH_FORMAT);
        if (MessageProperties.BATCH_FORMAT_LENGTH_HEADER4.equals(batchFormat)) {
            List<? super Object> aggregatedObjects = new ArrayList<>();
            ByteBuffer byteBuffer = ByteBuffer.wrap(message.getBody());
            MessageProperties messageProperties = message.getMessageProperties();
            String singleObjectTypeId = messageProperties.getHeaders().get(DEFAULT_CLASSID_FIELD_NAME).toString();
            messageProperties.getHeaders().remove(MessageProperties.SPRING_BATCH_FORMAT);
            while (byteBuffer.hasRemaining()) {
                int length = byteBuffer.getInt();
                if (length < 0 || length > byteBuffer.remaining()) {
                    throw new ListenerExecutionFailedException("Bad batched message received",
                            new MessageConversionException("Insufficient batch data at offset " + byteBuffer.position()),
                            message);
                }
                byte[] body = new byte[length];
                byteBuffer.get(body);
                messageProperties.setContentLength(length);
                // Caveat - shared MessageProperties.
                Message fragment = new Message(body, messageProperties);
                Object singleObject = this.payloadConverter.fromMessage(fragment);
                aggregatedObjects.add(singleObject);
            }
            Message aggregatedMessages = this.payloadConverter.toMessage(aggregatedObjects, messageProperties);
            aggregatedMessages.getMessageProperties().getHeaders().put(DEFAULT_CONTENT_CLASSID_FIELD_NAME, singleObjectTypeId);
            return aggregatedMessages;
        }
        return null;
    }
}

我需要这个用例来批量接收兔子上的所有消息,然后在弹性搜索中进行批量索引。谢谢。

【问题讨论】:

    标签: java spring rabbitmq spring-amqp


    【解决方案1】:

    在生产应用程序级别进行批处理(发送List&lt;SomeObject&gt;)可能比使用批处理模板更容易。那么你就不需要消费者方面的任何东西了。

    【讨论】:

    • 这不应该是批处理模板的默认行为吗?然后根据方法签名(列表或单个对象)它可以确定处理方法?
    • 不幸的是,debatching 是在低得多的级别上完成的 - 侦听器容器对其MessageListener 一无所知 - 我们可以考虑在容器中添加一个标志“不要 debatch”和然后在MessagingMessageListenerAdapter 中添加与您类似的代码。随时打开new feature JIRA Issue。并且总是欢迎贡献:)
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-12-15
    • 2021-07-22
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多