【问题标题】:BasicAck when processing messages asynchronously异步处理消息时的 BasicAck
【发布时间】:2016-04-22 15:43:18
【问题描述】:

我正在尝试设置 RabbitMQ 消息队列,以便我可以发送消息以启动长时间运行的进程,并且还能够发送消息以取消该长时间运行的进程(如果需要)。所以我从EventingBasicConsumer 开始,并在我的Recieved 处理程序中做了类似的事情:

if (startProcess) 
{
    // start a long running process
}
else if (cancelProcess)
{
    // cancel the currently running process
}
channel.BasicAck(ea.DeliveryTag, false);

这不起作用,因为EventingBasicConsumer 不是多线程的,一次只能处理一条消息。所以它不能处理取消消息,直到它完成长时间运行的过程(此时,显然没有意义)。所以接下来我尝试了这个:

if (startProcess) 
{
    Task.Run(() => {
        // start a long running process
    }
}
else if (cancelProcess)
{
    // cancel the currently running process
}
channel.BasicAck(ea.DeliveryTag, false);

这很有效。我现在可以取消长时间运行的进程...但是,我承认请求立即运行长时间运行的进程,而不是在它完成之后。这意味着如果长时间运行的进程崩溃,则该消息已被删除。因此,这将需要原始发送者进行跟踪,并让接收者必须发回消息说它已经完成,这一切都变得有点复杂。

所以我想也许我可以将EventingBasicConsumer 更改为始终在新线程上触发其Received 事件。所以我创造了这样的东西:

public class AsyncRabbitConsumer : DefaultBasicConsumer
{
    // code all the same as EventingBasicConsumer except this bit:
    public override void HandleBasicDeliver(string consumerTag,
        ulong deliveryTag,
        bool redelivered,
        string exchange,
        string routingKey,
        IBasicProperties properties,
        byte[] body)
    {
        base.HandleBasicDeliver(consumerTag,
            deliveryTag,
            redelivered,
            exchange,
            routingKey,
            properties,
            body);
        if (Received != null)
        {
            var args = new BasicDeliverEventArgs(consumerTag,
                    deliveryTag,
                    redelivered,
                    exchange,
                    routingKey,
                    properties,
                    body);

            Task.Run(() =>
            {
                Received(this, args);
            });
        }
    }
}

现在在我的第一个 sn-p 代码中,我可以让它在长时间运行的进程仍在运行时处理取消消息并且长时间运行的进程不会确认并删除它的消息,直到它实际上已经完成(或取消)。所以那应该很棒......除非我取消我得到这个:

在 RabbitMQ.Client.dll 中出现“RabbitMQ.Client.Exceptions.AlreadyClosedException”类型的异常,但未在用户代码中处理

附加信息:已经关闭:AMQP操作被中断:AMQP关闭原因,由Peer发起,code=406,text="PRECONDITION_FAILED - unknown delivery tag 3",classId=60,methodId=80,cause=

从似乎是启动长时间运行进程的线程的channel.BasicAck 步骤。那么这里发生了什么?我认为确认(首先是取消消息,然后是长时间运行的进程消息)在这里被越过了。有什么体面的方法可以解决这个问题吗?还是我找错树了?

可能值得注意的是,取消长时间运行的进程不是即时的。它将在下一个方便的时间点取消,因此几乎可以肯定取消消息将在长时间运行的进程结束之前完成处理。

【问题讨论】:

  • @Rob: 因为我上面有一个例外。
  • 抱歉 - 我的错,我略过了那部分。
  • 您如何管理连接?看起来您的频道正在被处置,这就是导致您失败的原因。还有——有多少工人?如果您有多个,则您的第二个工作人员可能正在取消,并试图取消一个不存在的任务。您可以让您的工作人员监视特定频道上的取消,或指定路由键
  • @Rob:频道是类中的一个字段。在应用程序关闭之前不会处理它。
  • 如果取消长时间运行的进程,是否要重新排队消息?我假设是的,但只是为了确定......

标签: c# rabbitmq amqp


【解决方案1】:

你可以做的是有类似消费者对的东西——第一个是长时间运行的进程,第二个是一个代理来终止长时间运行的进程。第一个将接收消息,处理它并在完成处理时确认,如果检测到终止信号,也会做确认。这对中的代理显然会收到取消消息并杀死第一个,并且还会产生第一个的另一个实例。显然,这需要进程(消费者)在 RMQ 之外进行通信。

想到的另一件事(但我从未尝试过这样的事情)是您在消费者中将 prefetch count 设置为 2,并在“处理单个数据消息”时发布发送给代理的第二条消息(转发),除非它是 CANCEL 消息,在这种情况下,您在中止处理后确认它们 - CANCEL 和 DATA(这样称呼它)消息。

另一种选择可能是在“长时间运行的进程”中,您有两个消费者线程,每个线程都使用自己的通道。

【讨论】:

    【解决方案2】:

    我遇到了同样的错误,因为在 BasicConsume 方法中 autoAck 标志为真。现在我已将标志更改为 false,并且在长时间运行的进程的 BasicAck 方法中没有出现错误。

    channel.BasicConsume(queue: "test", autoAck: false, consumer: consumer);
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2017-02-12
      • 1970-01-01
      • 1970-01-01
      • 2016-08-19
      • 1970-01-01
      • 1970-01-01
      • 2011-09-14
      • 1970-01-01
      相关资源
      最近更新 更多