【问题标题】:How can i have both consistent hashing exchange and topic exchange functionality at the same time?我怎样才能同时拥有一致的哈希交换和主题交换功能?
【发布时间】:2019-05-07 06:03:22
【问题描述】:

我有一个水平可扩展的应用程序。我的应用是spring-boot服务,spring框架版本为2.1.1.RELEASE。

每个工作人员都有一个队列(它是各自的侦听器)并绑定到“x-consistent-hash”类型的交换。我使用的是 Rabbitmq 3.7.7 版的 rabbitmq-consistent-hash-exchange 插件。

消息根据其 message_id (as described here) 以一致的方式正确散列,并且始终由我的应用程序的相同工作队列使用,如上所示:

我的实现代码如下:

public static final String EXCHANGE = "e3";
private static final String EXCHANGE_TYPE = "x-consistent-hash";
@Bean
 public Queue autoDeleteQueue1() {
    return new AnonymousQueue();
    }

    @Qualifier("testlistenerAdapter")
    @Bean
    MessageListenerAdapter testlistenerAdapter(TestListener receiver) {
        MessageListenerAdapter msgadapter = new MessageListenerAdapter(receiver, "testMessageReceived");
        return msgadapter;
    }

    @Qualifier("testcontainer")
    @Bean
    SimpleMessageListenerContainer testcontainer(ConnectionFactory connectionFactory,
            @Qualifier("testlistenerAdapter") MessageListenerAdapter listenerAdapter) throws IOException {

        Connection conn = connectionFactory.createConnection();
        Channel ch = conn.createChannel(true);


        ch.queueDeclare(autoDeleteQueue1().getName(), true, true, true, null);
        ch.queuePurge(autoDeleteQueue1().getName());

        Map<String, Object> args = new HashMap<>();
        args.put("hash-property", "message_id");

        ch.exchangeDeclare(EXCHANGE, EXCHANGE_TYPE, false, false, args);

        ch.queueBind(autoDeleteQueue1().getName(), EXCHANGE, "20");

        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(autoDeleteQueue1().getName());
        container.setMessageListener(listenerAdapter);
        return container;
    }

我想要做的是将具有相同 message_id 的所有消息路由到与以前相同的工作人员但在不同的队列侦听器中。在第一种情况下事情很容易,因为一个工人只有一个队列。既然一个工作人员有多个队列,我希望消息在同一个工作人员处一致地散列。

我尝试通过在每条消息中添加不同的 header/routing_key 来做到这一点,但没有成功。

例如,任何带有 message_id 的消息:“test”总是被路由到同一个 worker(比如说“worker B”),并且取决于它所拥有的 header/routing_key(foo 或 bar)将被 foo 或 bar 使用队列监听器(见下图)。我的应用程序的业务逻辑是有状态的,所以我需要具有相同 message_id 的消息由同一个工作人员提供服务。

我尝试了很多实现替代方案,但似乎这不是一件容易的事。恐怕我想做的事情是不可行的,因为 rabbitmq 插件会散列队列级别。一个快速的解决方案是坚持第一种情况。只保留一种类型的侦听器,然后在同一个端点内进行业务逻辑分离。关于如何实现这样的功能而不必将两个侦听器(foo 和 bar)的功能合并为一个的任何想法?

【问题讨论】:

  • Any ideas of how i could implement such functionality? -- 写一些代码?您的要求似乎很清楚: "test" 始终在同一个工作人员处路由(可以说是“工作人员 B”),并且根据它所具有的 header/routing_key(foo 或 bar)将被 foo 或 bar 队列消耗听众。我不太确定哈希与这些有什么关系。
  • 如果您需要一个模式名称来帮助您启动,API Gateway 怎么样?
  • 我的工人是有状态的。这意味着工作人员的队列侦听器应始终处理具有公共 message_id 的消息。这是棘手的一点。对于 rabbitmq 不同的队列监听器属于不同的队列,所以一致的哈希是在队列级别而不是在工作人员级别完成的。

标签: spring-boot hash rabbitmq broker


【解决方案1】:

这是不可行的。在rabbitmq中,不同的队列监听器属于不同的队列。
由于每个工作人员都创建自己的队列,我只能保证消息将始终进入同一个队列,而不是同一个工作人员。
一致的散列是在队列级别而不是在工作人员级别完成的。

我通过为每个工作人员只维护一个侦听器队列来完成我的实施。通过这种方式,我可以确保消息总是在同一个工作人员处路由。

我的示例项目在这里,如果有人有兴趣看看https://github.com/ubitech/generic-policy-engine-with-consistent-hashing

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2023-03-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多