【发布时间】:2022-02-01 15:51:01
【问题描述】:
我需要将具有多个路由键的多个交换绑定到一个队列,并且能够通过交换和路由键发送消息并通过队列名称侦听队列来接收它。
我的代码:
@Configuration
@RequiredArgsConstructor
@EnableConfigurationProperties(ExchangeConfig.class)
public class RabbitConfig {
private final ExchangeConfig exchangeConfig;
@Bean
public List<Binding> bindings() {
List<Binding> bindings = new ArrayList<>();
exchangeConfig.getExchangesWithKeys()
.forEach(exchangeWithKeys -> exchangeWithKeys.getRoutingKeys()
.forEach(key -> {
Exchange exchange = ExchangeBuilder.directExchange(exchangeWithKeys.getExchange()).build();
Queue queue = QueueBuilder.durable(exchangeConfig.getLogsQueue()).build();
Binding binding = BindingBuilder.bind(queue).to(exchange)
.with(key).noargs();
bindings.add(binding);
}));
return bindings;
}
}
配置:
spring:
rabbitmq:
host: localhost
port: 5672
rabbitmq:
exchanges-with-keys:
- exchange: exchange1
routing-keys: exchange1.live, exchange1.after
- exchange: exchange2
routing-keys: exchange2.live, exchange2.after
- exchange: exchange3
routing-keys: exchange3.live, exchange3.after
logs-queue: log-messages_q
道具:
@Data
@Component
@ConfigurationProperties(prefix = "rabbitmq")
public class ExchangeConfig {
private String logsQueue;
private List<ExchangeWithKeys> exchangesWithKeys;
@Data
public static class ExchangeWithKeys {
private String exchange;
private List<String> routingKeys;
}
}
听众:
@Component
@Slf4j
@RequiredArgsConstructor
public class LogsListener {
private final LogMessageEventProcessor logMessageEventProcessor;
@RabbitListener(queues = "${rabbitmq.logs-queue}")
public void onLiveEvent(LogMessageEvent event) {
log.info("Received log event message [{}]", event.getBody());
logMessageEventProcessor.processLogMessageEvent(event);
}
}
测试:
@SpringBootTest
@ContextConfiguration(initializers = LogsListenerTest.Initializer.class)
class LogsListenerTest {
@Autowired
private RabbitTemplate template;
@ClassRule
private static final RabbitMQContainer container = new RabbitMQContainer("rabbitmq:3.7.25-management-alpine")
.withExposedPorts(5672, 15672).withQueue("log-messages_q");
@BeforeAll
private static void startRabbit() {container.start();}
@AfterAll
private static void stopRabbit() {
container.stop();
}
@Test
public void test() {
template.convertAndSend("exchange1", "exchange1.live", new LogMessageEvent());
template.receiveAndConvert("log-messages_q");
}
public static class Initializer implements
ApplicationContextInitializer<ConfigurableApplicationContext> {
@Override
public void initialize(@NotNull ConfigurableApplicationContext configurableApplicationContext) {
val values = TestPropertyValues.of(
"spring.rabbitmq.host=" + container.getContainerIpAddress(),
"spring.rabbitmq.port=" + container.getMappedPort(5672)
);
values.applyTo(configurableApplicationContext);
}
}
}
上面的一切都不起作用。
那么我应该把这些绑定放在哪里才能让它工作呢?谢谢。
【问题讨论】:
标签: spring-boot rabbitmq spring-rabbit