【发布时间】:2016-04-22 15:43:18
【问题描述】:
我正在尝试设置 RabbitMQ 消息队列,以便我可以发送消息以启动长时间运行的进程,并且还能够发送消息以取消该长时间运行的进程(如果需要)。所以我从EventingBasicConsumer 开始,并在我的Recieved 处理程序中做了类似的事情:
if (startProcess)
{
// start a long running process
}
else if (cancelProcess)
{
// cancel the currently running process
}
channel.BasicAck(ea.DeliveryTag, false);
这不起作用,因为EventingBasicConsumer 不是多线程的,一次只能处理一条消息。所以它不能处理取消消息,直到它完成长时间运行的过程(此时,显然没有意义)。所以接下来我尝试了这个:
if (startProcess)
{
Task.Run(() => {
// start a long running process
}
}
else if (cancelProcess)
{
// cancel the currently running process
}
channel.BasicAck(ea.DeliveryTag, false);
这很有效。我现在可以取消长时间运行的进程...但是,我承认请求立即运行长时间运行的进程,而不是在它完成之后。这意味着如果长时间运行的进程崩溃,则该消息已被删除。因此,这将需要原始发送者进行跟踪,并让接收者必须发回消息说它已经完成,这一切都变得有点复杂。
所以我想也许我可以将EventingBasicConsumer 更改为始终在新线程上触发其Received 事件。所以我创造了这样的东西:
public class AsyncRabbitConsumer : DefaultBasicConsumer
{
// code all the same as EventingBasicConsumer except this bit:
public override void HandleBasicDeliver(string consumerTag,
ulong deliveryTag,
bool redelivered,
string exchange,
string routingKey,
IBasicProperties properties,
byte[] body)
{
base.HandleBasicDeliver(consumerTag,
deliveryTag,
redelivered,
exchange,
routingKey,
properties,
body);
if (Received != null)
{
var args = new BasicDeliverEventArgs(consumerTag,
deliveryTag,
redelivered,
exchange,
routingKey,
properties,
body);
Task.Run(() =>
{
Received(this, args);
});
}
}
}
现在在我的第一个 sn-p 代码中,我可以让它在长时间运行的进程仍在运行时处理取消消息并且长时间运行的进程不会确认并删除它的消息,直到它实际上已经完成(或取消)。所以那应该很棒......除非我取消我得到这个:
在 RabbitMQ.Client.dll 中出现“RabbitMQ.Client.Exceptions.AlreadyClosedException”类型的异常,但未在用户代码中处理
附加信息:已经关闭:AMQP操作被中断:AMQP关闭原因,由Peer发起,code=406,text="PRECONDITION_FAILED - unknown delivery tag 3",classId=60,methodId=80,cause=
从似乎是启动长时间运行进程的线程的channel.BasicAck 步骤。那么这里发生了什么?我认为确认(首先是取消消息,然后是长时间运行的进程消息)在这里被越过了。有什么体面的方法可以解决这个问题吗?还是我找错树了?
可能值得注意的是,取消长时间运行的进程不是即时的。它将在下一个方便的时间点取消,因此几乎可以肯定取消消息将在长时间运行的进程结束之前完成处理。
【问题讨论】:
-
@Rob: 因为我上面有一个例外。
-
抱歉 - 我的错,我略过了那部分。
-
您如何管理连接?看起来您的频道正在被处置,这就是导致您失败的原因。还有——有多少工人?如果您有多个,则您的第二个工作人员可能正在取消,并试图取消一个不存在的任务。您可以让您的工作人员监视特定频道上的取消,或指定路由键
-
@Rob:频道是类中的一个字段。在应用程序关闭之前不会处理它。
-
如果取消长时间运行的进程,是否要重新排队消息?我假设是的,但只是为了确定......