【问题标题】:Using Task.Run to start a continuously running background job with a BlockingCollection使用 Task.Run 以 BlockingCollection 启动连续运行的后台作业
【发布时间】:2019-02-17 23:57:10
【问题描述】:

我正在使用Task.RunBlockingCollection 上启动连续后台处理作业。它有效,但我认为这不是构建代码的理想方式。

此问题涉及以下行:public async Task ProcessQueue()。我收到警告:

This async method lacks 'await' operators and will run synchronously.

如果我删除async,则会出现错误:

'MessageProcessor.ProcessQueue()': not all code paths return a value

这是合理的,因为编译器看不到 getConsumingEnumerable 块。我应该使用async 版本并忽略此警告还是以不同的方式重组我的代码?

注意:假设我有 100 个这样的 MessageProcessor,我不认为为每个线程创建一个单独的线程会有效地使用内存,因此我使用 Task.Run


using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

public class Message
{
    public int id { get; set; }
    public int value { get; set; }
}

public class MessageProcessor
{
    private CancellationToken token;
    private BlockingCollection<Message> messageQueue;
    private List<Message> processedMessageList;

    public MessageProcessor(CancellationToken token)
    {
        this.token = token;
        messageQueue = new BlockingCollection<Message>(new ConcurrentQueue<Message>());
        processedMessageList = new List<Message>(10);
    }

    public void Add(Message m)
    {
        if (!messageQueue.IsAddingCompleted)
        {
            messageQueue.Add(m);
        }
    }

    public void CompleteAdding()
    {
        messageQueue.CompleteAdding();
    }

    public async Task ProcessQueue()
    {
        foreach (Message m in messageQueue.GetConsumingEnumerable())
        {
            Console.WriteLine($"Processing. id: {m.id}, value: {m.value}");
            processedMessageList.Add(m);

        }
    }
}

class Program
{
    static void Main(string[] args)
    {
        Random random = new Random();
        CancellationTokenSource tokenSource = new CancellationTokenSource();
        CancellationToken token = tokenSource.Token;
        MessageProcessor messageProcessor = new MessageProcessor(token);

        var messageProcessorResult = Task.Run(() => messageProcessor.ProcessQueue());

        for (int i = 0; i < 10; i++)
        {
            Message m = new Message();
            m.id = i;
            m.value = random.Next(-2, 2);

            Console.WriteLine($"Producing. id: {m.id}, value: {m.value}");
            messageProcessor.Add(m);
        }

        messageProcessor.CompleteAdding();
        messageProcessorResult.Wait();
    }
}

【问题讨论】:

  • 将返回类型从 Task 更改为 void。
  • @HansPassant async void 是一种不好的做法,看到有 800k 积分的人提出这个建议真是太可惜了
  • 很确定他的意思是没有 async 这是 OP 尝试的一部分
  • 删除async Task 并使其void 返回是正确的选择。 Task.CompletedTask 是有意义的,例如,如果这是实现一个需要 Task 的接口,但这里方法签名在我们的控制范围内。
  • @Evk 我看到你的一个 cmets 关于“BlockingCollection with ConcurrentQueue ...将在删除时阻塞”(stackoverflow.com/a/41271450/10984827)。在那种情况下,我的 ProcessQueue 方法会在这里阻塞吗?您是否推荐另一种构建此代码的方法?谢谢

标签: c# .net async-await task


【解决方案1】:

您需要的是action block

【讨论】:

    【解决方案2】:

    去掉async,在ProcessQueue方法的末尾写下:

    return Task.CompletedTask;
    

    这样我们解决了not all code paths return a value错误。

    【讨论】:

    • 编译器已经告诉他删除 async 关键字。在不需要 await 时将方法声明为异步是没有意义的。
    猜你喜欢
    • 2013-10-21
    • 2013-07-22
    • 2013-09-24
    • 2023-03-17
    • 2015-04-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多