【问题标题】:How to dynamically route message to queues in RabbitMQ如何将消息动态路由到 RabbitMQ 中的队列
【发布时间】:2017-09-08 10:18:09
【问题描述】:

我想开发可以将消息动态路由到不同队列(超过 10000 个队列)的解决方案。这就是我目前所拥有的:

  • 交换类型设置为topic。这样我就可以根据路由键将消息路由到不同的队列。
  • 10000 个队列的路由键为#.%numberOfQueue.#%numberOfQueue% 是该队列的简单数值(但可能会更改为更有意义的数值)。
  • 生产者使用如下路由键生成消息:5.10.15.105.10000,这意味着应该将消息路由到具有键 5、10、15、105 和 10000 的队列,因为它们符合队列的模式。

从 java 客户端 API 看是这样的:

String exchangeName = "rabbit.test.exchange";
String exchangeType = "topic";
boolean exchangeDurable = true;
boolean queueDurable = true;
boolean queueExclusive = false;
boolean queueAutoDelete = false;
Map<String, Object> queueArguments = null;

for (int i = 0; i < numberOfQueues; i++) {
    String queueNameIterated = "rabbit.test" + i + ".queue";
    channel.exchangeDeclare(exchangeName, exchangeType, exchangeDurable);
    channel.queueDeclare(queueNameIterated, queueDurable, queueExclusive, queueAutoDelete, queueArguments);

    String routingKey = "#." + i + ".#";
    channel.queueBind(queueNameIterated, exchangeName, routingKey);
}

这就是为从 0 到 9998 的队列的所有消息生成 routingKey 的方式:

private String generateRoutingKey() {
    StringBuilder keyBuilder = new StringBuilder();
    for (int i = 0; i < numberOfQueues - 2; i++) {
        keyBuilder.append(i);
        keyBuilder.append('.');
    }
    String result = keyBuilder.append(numberOfQueues - 2).toString();
    LOGGER.info("generated key: {}", result);
    return result;
}

看起来不错。问题是我不能用这么长的routingKeychannel.basicPublish() 方法:

线程“main”中的异常 java.lang.IllegalArgumentException:短字符串太长; utf-8 编码长度 = 48884,最大值 = 255。 在 com.rabbitmq.client.impl.ValueWriter.writeShortstr(ValueWriter.java:50) 在 com.rabbitmq.client.impl.MethodArgumentWriter.writeShortstr(MethodArgumentWriter.java:74) 在 com.rabbitmq.client.impl.AMQImpl$Basic$Publish.writeArgumentsTo(AMQImpl.java:2319) 在 com.rabbitmq.client.impl.Method.toFrame(Method.java:85) 在 com.rabbitmq.client.impl.AMQCommand.transmit(AMQCommand.java:104) 在 com.rabbitmq.client.impl.AMQChannel.quiescingTransmit(AMQChannel.java:396) 在 com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:372) 在 com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:690) 在 com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:672) 在 com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:662) 在 com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicPublish(AutorecoveringChannel.java:192)

我有要求:

  • 从生产者中动态选择队列产生消息的位置。可能只有一个队列、所有队列或 1000 个队列。
  • 我有超过 10000 个不同的队列,可能需要向它们生成相同的消息。

所以问题是:

  1. 我可以使用这么长的密钥吗?如果可以 - 怎么做?
  2. 也许我可以通过exchange或队列的不同配置来实现相同的目标?
  3. 也许有一些哈希函数可以有效区分目的地并将其折叠为 255 个符号?如果是这样,它应该提供处理不同发布的方法(例如如何仅发送到编号为 555 和 8989 的队列?)?
  4. 也许有一些不同的关键策略可以以这种方式使用?
  5. 我还能如何实现我的要求?

【问题讨论】:

    标签: java rabbitmq messaging rabbitmq-exchange


    【解决方案1】:

    不久前我才开始使用 RabbitQM,但仍希望对您有所帮助。路由键中可以有任意多的单词,最多为 255 个字节(如RabbitMQ Tutorial 5 - Topics 中所述)。因此,主题交换似乎不适合您的用例。

    也许您可以在这种情况下使用headers exchange?根据概念描述:

    标头交换设计用于在多个属性上进行路由,这些属性比路由键更容易表示为消息标头。标头交换忽略路由键属性。相反,用于路由的属性取自 headers 属性。如果标头的值等于绑定时指定的值,则认为消息匹配。

    有关示例,请参阅 herehere。正如我所说,我刚开始使用 RabbitMQ,因此,我不确定这是否适合您。如果以后有时间,我会尝试为您构建一个简单的示例。

    【讨论】:

    • 虽然这可能会回答问题,it would be preferable 在此处包含答案的基本部分,并提供链接以供参考。链接和被破坏,未来的用户应该能够在不跟随链接的情况下获得问题的答案。考虑使用您在此处推荐的技术发布一些回答问题的代码。
    猜你喜欢
    • 1970-01-01
    • 2017-03-30
    • 1970-01-01
    • 1970-01-01
    • 2018-07-28
    • 2013-08-09
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多