【问题标题】:MessageQueue and Async / Await消息队列和异步/等待
【发布时间】:2013-04-12 14:52:30
【问题描述】:

我只想以异步方法接收我的消息!它冻结了我的用户界面

    public async void ProcessMessages()
    {
        MessageQueue MyMessageQueue = new MessageQueue(@".\private$\MyTransactionalQueue");
        MyMessageQueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(string) });

        while (true)
        {
            MessageQueueTransaction MessageQueueTransaction = new MessageQueueTransaction();
            MessageQueueTransaction.Begin();

            ContainError = false;
            ProcessPanel.SetWaiting();

            string Body = MyMessageQueue.Receive(MessageQueueTransaction).Body.ToString();

            //Do some process with body string.

            MessageQueueTransaction.Commit();
        }
    }

我只是像任何常规方法一样调用该方法,但它没有工作! 当我使用 BackgroundWorkers 而不是 async/await 时,这段代码可以正常工作

想法?

【问题讨论】:

  • 您的代码是否产生任何警告?您是否阅读过他们所说的并试图理解他们?
  • 查看您的错误列表,您会看到“此异步方法缺少 'await' 运算符并将同步运行”
  • 这不是一个本地化的问题——事实上,SO 中有很多类似的问题,人们认为async 以某种方式使他们的方法异步运行

标签: c# message-queue async-await


【解决方案1】:

正如斯蒂芬所写,异步不会在线程中运行您的代码。幸运的是,您可以使用TaskFactory.FromAsyncMessageQueue.BeginReceive/MessageQueue.EndReceive 来异步接收消息:

    private  async Task<Message> MyAsyncReceive()
    {
        MessageQueue queue=new MessageQueue();
        ...
        var message=await Task.Factory.FromAsync<Message>(
                           queue.BeginReceive(),
                           queue.EndReceive);

        return message;

    }

您应该注意,虽然没有使用 Transaction 的 BeginReceive 版本。来自 BeginReceive 的文档:

不要对事务使用异步调用 BeginReceive。如果要执行事务性异步操作,请调用 BeginPeek,并将事务和(同步)Receive 方法放入为 peek 操作创建的事件处理程序中。

这是有道理的,因为无法保证您必须等待多长时间才能等待响应,或者哪个线程最终会处理完成的调用。

要使用事务,你可以这样写:

    private  async Task<Message> MyAsyncReceive()
    {
        var queue=new MessageQueue();

        var message=await Task.Factory.FromAsync<Message>(queue.BeginPeek(),queue.EndPeek);

        using (var tx = new MessageQueueTransaction())
        {
            tx.Begin();

            //Someone may have taken the last message, don't wait forever
            //Use a smaller timeout if the queue is local
            message=queue.Receive(TimeSpan.FromSeconds(1), tx);
            //Process the results inside a transaction
            tx.Commit();
        }
        return message;
    }

更新

正如 Rob 所指出的,原始代码使用了从 Peek 返回的 message,它可能在 PeekReceive 之间发生了变化。在这种情况下,第二条消息将丢失。

如果另一个客户端读取队列中的最后一条消息,仍然有可能发生阻塞。为了防止这种情况,Receive 应该有一个小的超时。

【讨论】:

  • 这个答案很棒!它有效!如果我想多次运行这个 MyAsyncReceive 方法,他们会共享我的线程吗?所以他们会很慢?
  • 该方法没有什么特别之处。运行时将为每次调用使用线程池中的可用线程。
  • 这不包含竞争条件吗? message 变量可能包含与收到的消息不同的消息,不是吗? (特别是如果有多个消费者)。 peek 将返回 a 消息,但 Receive() 方法可能会收到不同的消息。我会将var message = 放在“Peek 等待者”中并使用var message = queue.Receive(tx); 确保在事务中处理正确的消息..
  • 糟糕,应该是message=queue.Receive(tx)
  • 仅使用收到的消息是不够的,因为其他人可能收到了唯一可用的消息。 Receive 应该有一个非常短的超时时间,或者应该使用游标来确保实际加载了偷看的消息
【解决方案2】:

async does not run your code on a background thread.您上面的代码应该引起编译器警告,告诉您您的方法将同步运行。

如果要在后台线程上执行方法,请使用TaskEx.Run

public void ProcessMessages()
{
  ...
}

TaskEx.Run(() => ProcessMessages());

【讨论】:

  • 如果我使用 run 那么它将创建一个新线程!所以我不能直接修改 UI,我需要使用 Dispatch rigth 吗?像普通线程一样?或者还有其他方法吗?我只需要更新主窗体中的进度条。
  • 我推荐使用IProgress&lt;T&gt;Progress&lt;T&gt; 来处理线程调度。 TAP documentation covers IProgress&lt;T&gt;.
  • 我无法将两个答案标记为正确!我的问题的答案是使用 Task.Factory.FromAsync 但这样我就不会使用新线程的力量。所以我会用运行!谢谢
猜你喜欢
  • 2019-11-17
  • 2019-03-04
  • 1970-01-01
  • 2015-11-01
  • 1970-01-01
  • 1970-01-01
  • 2020-04-23
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多