【发布时间】:2020-08-30 21:59:45
【问题描述】:
我正在尝试设置与队列的标头交换,在该队列中,消息基于收件人标头进行路由。
交换是标头类型。
到目前为止,该类能够连接到交换器并将消息提供给队列。 它还能够订阅队列并接收消息。每当订阅者的连接被取消时,它也会关闭连接。
目前的问题是邮件不是按收件人的标头值路由的。
给定以下类:
import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
@Slf4j
public class MyQueue {
private final ConnectionFactory connectionFactory;
private Channel channel;
public MyQueue(ConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
}
public String sendMessage(TestTextMessage message) throws UndeliverableMessageException {
try (Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel()) {
Map<String, Object> headers = new HashMap<>();
headers.put(RabbitMqConfig.MATCH_HEADER, message.getRecipient());
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.deliveryMode(MessageProperties.PERSISTENT_TEXT_PLAIN.getDeliveryMode())
.priority(MessageProperties.PERSISTENT_TEXT_PLAIN.getPriority())
.headers(headers).build();
log.info("Sending message to {}", headers);
channel.basicPublish(RabbitMqConfig.EXCHANGE_NAME, "", props,
message.getMessage().getBytes(StandardCharsets.UTF_8));
log.info("RabbitMQ sent message {} to {}", message.getMessage(), message.getRecipient());
return "ok";
} catch (TimeoutException e) {
log.error("Rabbit mq timeout", e);
} catch (IOException e) {
log.error("Rabbit mq io error", e);
}
throw new UndeliverableMessageException();
}
public Flux<String> listenMessages(String recipient) throws IOException, TimeoutException {
Connection connection = connectionFactory.newConnection();
this.channel = connection.createChannel();
// The map for the headers.
Map<String, Object> headers = new HashMap<>();
headers.put("x-match", "all");
headers.put(RabbitMqConfig.MATCH_HEADER, recipient);
final String[] consumerTag = new String[1];
Flux<String> as = Flux.create(sink -> new MessageListener<String>() {
{
try {
log.info("Binding to {}", headers);
channel.queueBind(RabbitMqConfig.QUEUE_NAME, RabbitMqConfig.EXCHANGE_NAME, "",
headers);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
log.info("Subscriber {} received a message {} with headers {}", recipient, delivery.getEnvelope(),
delivery.getProperties().getHeaders());
sink.next(delivery.getEnvelope().getDeliveryTag() + "--" + message);
//channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
consumerTag[0] = channel.basicConsume(RabbitMqConfig.QUEUE_NAME,
true, deliverCallback, tag -> {
sink.complete();
});
} catch (IOException e) {
log.error("RabbitMQ IOException ", e);
}
}
});
return as.doOnCancel(() -> {
try {
if (consumerTag[0] == null) {
log.error("RabbitMQ uncloseable subscription, consumerTag is null!");
channel.close();
return;
}
channel.basicCancel(consumerTag[0]);
channel.close();
log.info("RabbitMQ CANCEL subscription for recipient {}", recipient);
} catch (IOException | TimeoutException e) {
log.error("RabbitMQ channel close error", e);
}
});
}
interface MessageListener<T> {
}
}
交换由以下调用声明
channel.exchangeDeclare(RabbitMqConfig.EXCHANGE_NAME, BuiltinExchangeType.HEADERS, true);
绑定收件人日志:
Binding to {x-match=all, message-recipient=mary}
Binding to {x-match=all, message-recipient=james}
Binding to {x-match=all, message-recipient=john}
但是,消息不匹配,好像它们是随机路由的
Sending message to {message-recipient=james}
RabbitMQ sent message Hey there to james
Subscriber mary received a message Envelope(deliveryTag=1, redeliver=false, exchange=my-exchange, routingKey=) with headers {message-recipient=james}
Sending message to {message-recipient=james}
RabbitMQ sent message Hey there to james
Subscriber james received a message Envelope(deliveryTag=1, redeliver=false, exchange=my-exchange, routingKey=) with headers {message-recipient=james}
Sending message to {message-recipient=james}
RabbitMQ sent message Hey there to james
Subscriber john received a message Envelope(deliveryTag=1, redeliver=false, exchange=my-exchange, routingKey=) with headers {message-recipient=james}
为什么x-match: all 不匹配?
【问题讨论】:
-
我只是在非常表面的意义上使用了RabbitMQ,但是你能提供交换声明代码吗?
-
@Gryphon 当然,我已将呼叫添加到问题中
-
所以从我在文档和here 中看到的内容来看,RabbitMQ 不会在消费者端按标头过滤消息。标头绑定用于路由(而不是路由键)到不同的队列。因此,您需要为 mary 排队,为 james 排队,为 john 排队。服务器将根据标头路由到队列,消费者将从各自的队列中读取。
-
好的,这样测试过,它可以通过基于收件人匹配的路由来按预期工作。我以为路由会发生在队列级别,但它实际上发生在交换级别,如果您提供代码 sn-p,我可以将您的答案标记为已接受
-
如果您已经有测试 sn-p,请继续发布(特别是如果它适合您)并接受作为您自己的答案。由于我目前没有办法对其进行测试,因此最好是正确的,而不是我的猜测。