【问题标题】:AMQPNETLITE - ActiveMQ Artemis (Red Hat AMQ) - autocreate multi-consumer multicast queueAMQPNETLITE - ActiveMQ Artemis (Red Hat AMQ) - 自动创建多消费者多播队列
【发布时间】:2019-01-05 19:57:47
【问题描述】:

这个问题是关于在 .Net 中使用 AMQP 使用消息的。文档推荐amqpnetlite:https://access.redhat.com/documentation/en-us/red_hat_amq/7.0/html-single/using_the_amq_.net_client/index

使用 AMQPNetLite 订阅地址时,将自动创建地址和队列。不过,自动创建的队列始终是“单播”的。我无法自动创建

  1. 多播队列
  2. 允许任意数量的消费者。

代码:

private async Task RenewSession()
{
    Connect = await Connection.Factory.CreateAsync(new Address("amqp://admin:admin@localhost:5672"), new Open() {ContainerId = "client-1"});
    MqSession = new Session(Connect);
    var receiver = new ReceiverLink(MqSession, DEFAULT_SUBSCRIPTION_NAME, GetSource("test-topic"), null);
    receiver.Start(100, OnMessage);
}

private Source GetSource(string address)
{
    var source = new Source
    {
        Address = address,
        ExpiryPolicy = new Symbol("never"),
        Durable = 2,
        DefaultOutcome = new Modified
        {
            DeliveryFailed = true,
            UndeliverableHere = false
        }
    };
    return source;
}

也许我遗漏了一些标志?

【问题讨论】:

  • 得到第一个问题的解决方案 - 创建多播队列是使用源功能: Capabilities = new[] { new Symbol("topic") }, // 或 "queue" 来请求队列。

标签: amqp jboss-amq


【解决方案1】:

在 AMQP 中,您可以通过设置功能在自动创建队列(任播路由)或主题(多播路由)之间进行选择。

能力应该是new Symbol("queue")new Symbol("topic")

public class SimpleAmqpTest
{
    [Fact]
    public async Task TestHelloWorld()
    {
        Address address = new Address("amqp://guest:guest@localhost:5672");
        Connection connection = await Connection.Factory.CreateAsync(address);
        Session session = new Session(connection);

        Message message = new Message("Hello AMQP");

        Target target = new Target
        {
            Address = "q1",
            Capabilities = new Symbol[] { new Symbol("queue") }
        };

        SenderLink sender = new SenderLink(session, "sender-link", target, null);
        await sender.SendAsync(message);

        Source source = new Source
        {
            Address = "q1",
            Capabilities = new Symbol[] { new Symbol("queue") }
        };

        ReceiverLink receiver = new ReceiverLink(session, "receiver-link", source, null);
        message = await receiver.ReceiveAsync();
        receiver.Accept(message);

        await sender.CloseAsync();
        await receiver.CloseAsync();
        await session.CloseAsync();
        await connection.CloseAsync();
    }
}

看看https://github.com/Azure/amqpnetlite/issues/286,代码来自哪里。

您可以通过在 broker.xml 中设置 default-address-routing-type 来选择默认路由是多播还是任播,所有内容都记录在 https://activemq.apache.org/artemis/docs/2.6.0/address-model.html

代理的multicastPrefixanycastPrefix 功能未针对AMQP 实现。 https://issues.jboss.org/browse/ENTMQBR-795

【讨论】:

  • 不久前我发现了同样的细节,作为对问题的评论添加。您知道问题的第二部分吗 - 如何创建多播队列以允许最大消费者为 -1(任意数量)。注意:地址允许多播队列,是的,但是一个多播队列的最大消费者设置为1
  • @PraveenNayak 不是。 AFAIK 你不能那样自动创建它,也许你可以在 broker.xml 中设置它,或者通过管理。我要问的是为什么你要设置它。
  • 多播路由的想法是你有一个地址,每个消费者在该地址下都有自己的订阅队列,它的消息被传递到那里。所以即使每个订阅队列只有一个consumer,但每个consumer都有一个单独的订阅队列,因此地址上会有多个consumer,没问题。
  • 需要能够将代理用于发布订阅。发布者将“OrderPlaced”发布到多播地址。两个订阅者“InventoryMgmt”和“Billing”分别在 1 个队列中订阅它。我们想通过添加 3 个“Billing”消费者实例来分配“Billing”工作负载,而“InventoryMgmt”有 1 个消费者。在 3 个队列中添加 3 个“计费”消费者将导致所有 3 个消费者各获得一份消息副本。在“计费”队列中工作的所有 3 个消费者将确保工作负载在三个之间分配,可能是循环。我希望这是有道理的。
  • @PraveenNayak 我想我明白了。使用 JMS 术语,它听起来就像“共享订阅”功能jmesnil.net/weblog/2013/06/27/jms-20-shared-subscription。您可以通过设置"shared"(也可能还有"global")功能来实现这一点。此外,在接收者选项中设置订阅名称,例如opts.name("sub-1"); // A stable link name(根据issues.jboss.org/browse/ENTMQCL-580 的第一条评论链接的示例,这将在 qpid-proton-cpp 中)。消息将在所有设置相同名称的接收者之间分发。
猜你喜欢
  • 2019-02-26
  • 1970-01-01
  • 2016-11-14
  • 2014-06-27
  • 2018-12-22
  • 1970-01-01
  • 2014-01-22
  • 1970-01-01
  • 2012-08-11
相关资源
最近更新 更多