【发布时间】:2019-05-07 06:03:22
【问题描述】:
我有一个水平可扩展的应用程序。我的应用是spring-boot服务,spring框架版本为2.1.1.RELEASE。
每个工作人员都有一个队列(它是各自的侦听器)并绑定到“x-consistent-hash”类型的交换。我使用的是 Rabbitmq 3.7.7 版的 rabbitmq-consistent-hash-exchange 插件。
消息根据其 message_id (as described here) 以一致的方式正确散列,并且始终由我的应用程序的相同工作队列使用,如上所示:
我的实现代码如下:
public static final String EXCHANGE = "e3";
private static final String EXCHANGE_TYPE = "x-consistent-hash";
@Bean
public Queue autoDeleteQueue1() {
return new AnonymousQueue();
}
@Qualifier("testlistenerAdapter")
@Bean
MessageListenerAdapter testlistenerAdapter(TestListener receiver) {
MessageListenerAdapter msgadapter = new MessageListenerAdapter(receiver, "testMessageReceived");
return msgadapter;
}
@Qualifier("testcontainer")
@Bean
SimpleMessageListenerContainer testcontainer(ConnectionFactory connectionFactory,
@Qualifier("testlistenerAdapter") MessageListenerAdapter listenerAdapter) throws IOException {
Connection conn = connectionFactory.createConnection();
Channel ch = conn.createChannel(true);
ch.queueDeclare(autoDeleteQueue1().getName(), true, true, true, null);
ch.queuePurge(autoDeleteQueue1().getName());
Map<String, Object> args = new HashMap<>();
args.put("hash-property", "message_id");
ch.exchangeDeclare(EXCHANGE, EXCHANGE_TYPE, false, false, args);
ch.queueBind(autoDeleteQueue1().getName(), EXCHANGE, "20");
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(autoDeleteQueue1().getName());
container.setMessageListener(listenerAdapter);
return container;
}
我想要做的是将具有相同 message_id 的所有消息路由到与以前相同的工作人员但在不同的队列侦听器中。在第一种情况下事情很容易,因为一个工人只有一个队列。既然一个工作人员有多个队列,我希望消息在同一个工作人员处一致地散列。
我尝试通过在每条消息中添加不同的 header/routing_key 来做到这一点,但没有成功。
例如,任何带有 message_id 的消息:“test”总是被路由到同一个 worker(比如说“worker B”),并且取决于它所拥有的 header/routing_key(foo 或 bar)将被 foo 或 bar 使用队列监听器(见下图)。我的应用程序的业务逻辑是有状态的,所以我需要具有相同 message_id 的消息由同一个工作人员提供服务。
我尝试了很多实现替代方案,但似乎这不是一件容易的事。恐怕我想做的事情是不可行的,因为 rabbitmq 插件会散列队列级别。一个快速的解决方案是坚持第一种情况。只保留一种类型的侦听器,然后在同一个端点内进行业务逻辑分离。关于如何实现这样的功能而不必将两个侦听器(foo 和 bar)的功能合并为一个的任何想法?
【问题讨论】:
-
Any ideas of how i could implement such functionality?-- 写一些代码?您的要求似乎很清楚: "test" 始终在同一个工作人员处路由(可以说是“工作人员 B”),并且根据它所具有的 header/routing_key(foo 或 bar)将被 foo 或 bar 队列消耗听众。我不太确定哈希与这些有什么关系。 -
如果您需要一个模式名称来帮助您启动,API Gateway 怎么样?
-
我的工人是有状态的。这意味着工作人员的队列侦听器应始终处理具有公共 message_id 的消息。这是棘手的一点。对于 rabbitmq 不同的队列监听器属于不同的队列,所以一致的哈希是在队列级别而不是在工作人员级别完成的。
标签: spring-boot hash rabbitmq broker