【问题标题】:RabbitMQ, headers exchange, messages not routed by headers x-match = allRabbitMQ,标头交换,未按标头路由的消息 x-match = all
【发布时间】:2020-08-30 21:59:45
【问题描述】:

我正在尝试设置与队列的标头交换,在该队列中,消息基于收件人标头进行路由。

交换是标头类型。

到目前为止,该类能够连接到交换器并将消息提供给队列。 它还能够订阅队列并接收消息。每当订阅者的连接被取消时,它也会关闭连接。

目前的问题是邮件不是按收件人的标头值路由的。

给定以下类:

import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

@Slf4j
public class MyQueue {

private final ConnectionFactory connectionFactory;
private Channel channel;


public MyQueue(ConnectionFactory connectionFactory) {
    this.connectionFactory = connectionFactory;
}

public String sendMessage(TestTextMessage message) throws UndeliverableMessageException {
    try (Connection connection = connectionFactory.newConnection();
         Channel channel = connection.createChannel()) {

        Map<String, Object> headers = new HashMap<>();
        headers.put(RabbitMqConfig.MATCH_HEADER, message.getRecipient());
        AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
                .deliveryMode(MessageProperties.PERSISTENT_TEXT_PLAIN.getDeliveryMode())
                .priority(MessageProperties.PERSISTENT_TEXT_PLAIN.getPriority())
                .headers(headers).build();

        log.info("Sending message to {}", headers);

        channel.basicPublish(RabbitMqConfig.EXCHANGE_NAME, "", props,
                message.getMessage().getBytes(StandardCharsets.UTF_8));

        log.info("RabbitMQ sent message {} to {}", message.getMessage(), message.getRecipient());
        return "ok";
    } catch (TimeoutException e) {
        log.error("Rabbit mq timeout", e);
    } catch (IOException e) {
        log.error("Rabbit mq io error", e);
    }
    throw new UndeliverableMessageException();
}

public Flux<String> listenMessages(String recipient) throws IOException, TimeoutException {
    Connection connection = connectionFactory.newConnection();
    this.channel = connection.createChannel();

    // The map for the headers.
    Map<String, Object> headers = new HashMap<>();
    headers.put("x-match", "all");
    headers.put(RabbitMqConfig.MATCH_HEADER, recipient);

    final String[] consumerTag = new String[1];
    Flux<String> as = Flux.create(sink -> new MessageListener<String>() {
        {
            try {
                log.info("Binding to {}", headers);
                channel.queueBind(RabbitMqConfig.QUEUE_NAME, RabbitMqConfig.EXCHANGE_NAME, "",
                        headers);
                DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                    String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
                    log.info("Subscriber {} received a message {} with headers {}", recipient, delivery.getEnvelope(),
                            delivery.getProperties().getHeaders());

                    sink.next(delivery.getEnvelope().getDeliveryTag() + "--" + message);
                    //channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                };

                consumerTag[0] = channel.basicConsume(RabbitMqConfig.QUEUE_NAME,
                        true, deliverCallback, tag -> {
                            sink.complete();
                        });

            } catch (IOException e) {
                log.error("RabbitMQ IOException ", e);
            }
        }

    });

    return as.doOnCancel(() -> {
        try {
            if (consumerTag[0] == null) {
                log.error("RabbitMQ uncloseable subscription, consumerTag is null!");
                channel.close();
                return;
            }
            channel.basicCancel(consumerTag[0]);
            channel.close();
            log.info("RabbitMQ CANCEL subscription for recipient {}", recipient);
        } catch (IOException | TimeoutException e) {
            log.error("RabbitMQ channel close error", e);
        }
    });
}

interface MessageListener<T> {

}
}

交换由以下调用声明

channel.exchangeDeclare(RabbitMqConfig.EXCHANGE_NAME, BuiltinExchangeType.HEADERS, true);

绑定收件人日志:

Binding to {x-match=all, message-recipient=mary}
Binding to {x-match=all, message-recipient=james}
Binding to {x-match=all, message-recipient=john}

使用 x-match 绑定 3 个收件人:

但是,消息不匹配,好像它们是随机路由的

Sending message to {message-recipient=james}
RabbitMQ sent message Hey there to james
Subscriber mary received a message Envelope(deliveryTag=1, redeliver=false, exchange=my-exchange, routingKey=) with headers {message-recipient=james}

Sending message to {message-recipient=james}
RabbitMQ sent message Hey there to james
Subscriber james received a message Envelope(deliveryTag=1, redeliver=false, exchange=my-exchange, routingKey=) with headers {message-recipient=james}

Sending message to {message-recipient=james}
RabbitMQ sent message Hey there to james
Subscriber john received a message Envelope(deliveryTag=1, redeliver=false, exchange=my-exchange, routingKey=) with headers {message-recipient=james}

为什么x-match: all 不匹配?

【问题讨论】:

  • 我只是在非常表面的意义上使用了RabbitMQ,但是你能提供交换声明代码吗?
  • @Gryphon 当然,我已将呼叫添加到问题中
  • 所以从我在文档和here 中看到的内容来看,RabbitMQ 不会在消费者端按标头过滤消息。标头绑定用于路由(而不是路由键)到不同的队列。因此,您需要为 mary 排队,为 james 排队,为 john 排队。服务器将根据标头路由到队列,消费者将从各自的队列中读取。
  • 好的,这样测试过,它可以通过基于收件人匹配的路由来按预期工作。我以为路由会发生在队列级别,但它实际上发生在交换级别,如果您提供代码 sn-p,我可以将您的答案标记为已接受
  • 如果您已经有测试 sn-p,请继续发布(特别是如果它适合您)并接受作为您自己的答案。由于我目前没有办法对其进行测试,因此最好是正确的,而不是我的猜测。

标签: java rabbitmq


【解决方案1】:

在阅读了@Gryphon 发布的评论后,在订阅者方面,我最终为每个参与者创建了一个队列。

channel.queueDeclare(RabbitMqConfig.QUEUE_NAME + "-" + recipient,
    true,
    false,
    false,
    null)

在发布者端,代码保持不变,消息被发送到交换器,交换器将根据x-match: all配置处理路由,将消息路由到适当的队列。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2022-08-11
    • 2015-05-03
    • 2017-10-07
    • 2019-09-24
    • 2018-06-24
    相关资源
    最近更新 更多