【问题标题】:ActiveMQ + NMS. Prefetch limit not respectedActiveMQ + NMS。不遵守预取限制
【发布时间】:2021-01-08 20:53:19
【问题描述】:

我有一个同步消费者通过这个 URL 连接到一个队列,它也设置了预取限制:

tcp://localhost:61616?nms.prefetchPolicy.all=5

我以编程方式检查限制并将其输出到我的日志中,因此我知道语法是正确的并且正在设置中。

我对预取的理解是同步消费者将被发送 x 条消息,然后一旦它确认了其中的 50%,它将被发送另一批。这不是我看到的行为。

  • 我已在 URL 中将 prefetch 设置为 5
  • 我已经预先填充了一个包含 20 条消息的队列
  • 我记录收到的每条消息
  • 我已将代码中的 message.Acknowledge(); 行注释掉

所以我认为我应该看到 5 条消息出现在我的日志中,并且不会更多,因为我不承认其中任何一条。 但是,我看到所有 20 条消息都出现在日志中。

我是否误解了预取的工作方式,或者我的代码中是否存在错误?

代码(c#、.Net Core):

public Worker(LogWriter logger, ServiceConfiguration config, IConnectionFactory connectionFactory, IEndpointClient endpointClient)
{
    log = logger;
    configuration = config;
    this.endpointClient = endpointClient;

    connection = connectionFactory.CreateConnection();
    connection.RedeliveryPolicy = GetRedeliveryPolicy();
    connection.ExceptionListener += new ExceptionListener(OnException);
    connection.Start();
    session = connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
    queue = session.GetQueue(configuration.JmsConfig.SourceQueueName);
    consumer = session.CreateConsumer(queue);
    log.InfoFormat("Prefetch set to:{0}. Maximum configurable:{1}", ((Connection)connection).PrefetchPolicy.QueuePrefetch, PrefetchPolicy.MAX_PREFETCH_SIZE);

    while (true)
    {
        var message = consumer.Receive(TimeSpan.FromSeconds(5));
        if (!Equals(message, null))
        {
            OnMessage(message);
        }
    }
}

public void OnMessage(IMessage message)
{
    log.DebugFormat("Message {count} Received. Attempt:{attempt}", message.Properties.GetInt("count"), message.Properties.GetInt("NMSXDeliveryCount"));
    //message.Acknowledge();
}

日志:

[18:07:37 INF] Prefetch set to:5. Maximum configurable:32766
[18:07:38 DBG] Message 0 Received. Attempt:2
[18:07:38 DBG] Message 1 Received. Attempt:2
[18:07:38 DBG] Message 2 Received. Attempt:2
[18:07:38 DBG] Message 3 Received. Attempt:2
[18:07:40 DBG] Message 4 Received. Attempt:2
[18:07:40 DBG] Message 5 Received. Attempt:2
[18:07:40 DBG] Message 6 Received. Attempt:2
[18:07:40 DBG] Message 7 Received. Attempt:2
[18:07:40 DBG] Message 8 Received. Attempt:2
[18:07:40 DBG] Message 9 Received. Attempt:2
[18:07:40 DBG] Message 10 Received. Attempt:2
[18:07:40 DBG] Message 11 Received. Attempt:2
[18:07:40 DBG] Message 12 Received. Attempt:2
[18:07:40 DBG] Message 13 Received. Attempt:2
[18:07:40 DBG] Message 14 Received. Attempt:2
[18:07:40 DBG] Message 15 Received. Attempt:2
[18:07:41 DBG] Message 16 Received. Attempt:2
[18:07:41 DBG] Message 17 Received. Attempt:2
[18:07:41 DBG] Message 18 Received. Attempt:2
[18:07:41 DBG] Message 19 Received. Attempt:2

提前致谢:)

附:我正在使用:

  • ActiveMQ v5.15.12
  • Apache.NMS.ActiveMQ.NetCore v1.7.2 (NuGet)

【问题讨论】:

    标签: c# .net-core activemq prefetch nms


    【解决方案1】:

    预取值并不意味着您可以控制队列调度,它只是控制如果您的应用程序实际上没有进行任何接收处理或消耗缓慢,有多少将被缓冲等待交付给您。这允许代理向客户端提供一些消息,然后继续向其他客户端提供在第一个正在处理其预取时到达的消息。

    一旦您开始使用消息,无论您是否确认消息,客户端都会扩展预取信用,以允许始终存在与预取金额大小相同的积压。这是因为作为消费者,您可能希望批量读取许多消息并一次确认它们,这就是客户端确认和事务消费一样。如果我没记错不是 50%,NMS 客户端会在 70% 调度时充值预取信用,但不管这种情况发生在你从预取缓冲区中提取该数量的消息后。

    如果您想控制从代理发送的消息,那么您需要使用零预取设置和同步接收消费者,以确保在任何给定时间只有一条消息可以发送到客户端。

    【讨论】:

    • 谢谢蒂姆。您的声誉在您之前,我一直希望您能以此为我指明正确的方向,就像您与许多其他有 ActiveMQ 问题的人一样。我现在明白 prefetch 是一个缓冲区,当消费者接受消息而不是确认消息时会重新填充。
    猜你喜欢
    • 2017-09-16
    • 1970-01-01
    • 2023-03-24
    • 1970-01-01
    • 2021-02-03
    • 2022-10-18
    • 1970-01-01
    • 1970-01-01
    • 2011-03-07
    相关资源
    最近更新 更多