【问题标题】:.NET queued tasks (with async/await).NET 排队任务(使用 async/await)
【发布时间】:2017-04-21 16:03:16
【问题描述】:

我有大量任务 (~1000) 需要执行。我在 4 核处理器上运行,所以我想一次并行处理 4 个任务。

为了给你一个起点,这里有一些示例代码。

class Program
{
    public class LongOperation
    {
        private static readonly Random RandomNumberGenerator = new Random(0);
        const int UpdateFrequencyMilliseconds = 100;

        public int CurrentProgress { get; set; }

        public int TargetProcess { get; set; }

        public LongOperation()
        {
            TargetProcess = RandomNumberGenerator.Next(
                (int)TimeSpan.FromSeconds(5).TotalMilliseconds / UpdateFrequencyMilliseconds, 
                (int)TimeSpan.FromSeconds(10).TotalMilliseconds / UpdateFrequencyMilliseconds);
        }

        public async Task Execute()
        {
            while (!IsCompleted)
            {
                await Task.Delay(UpdateFrequencyMilliseconds);
                CurrentProgress++;
            }
        }

        public bool IsCompleted => CurrentProgress >= TargetProcess;
    }

    static void Main(string[] args)
    {
        Task.Factory.StartNew(async () =>
        {
            var operations = new List<LongOperation>();

            for(var x = 1; x <= 10; x++)
                operations.Add(new LongOperation());

            await ProcessOperations(4, operations);
        }).Wait();
    }

    public static async Task ProcessOperations(int maxSimultaneous, List<LongOperation> operations)
    {
        await Task.WhenAll(operations.Select(x => x.Execute()));
        // TODO: Process up to 4 operations at a time, until every operation is completed.
    }
}

我想了解一下我将使用哪些类,以及我将如何构造 ProcessOperations 以一次处理最多 4 个操作,直到所有操作都完成,在一个可等待的 @987654324 中@。

我正在考虑以某种方式使用 SemaphoreSlim 对象,因为它似乎是为了限制资源/进程。

【问题讨论】:

  • 您应该查看TPL DataFlow 它是微软为您尝试执行的确切情况提供的库。您只需将 List&lt;LongOperation&gt; 替换为 ActionBlock&lt;LongOperation&gt; 并设置并行限制。
  • 我知道我正在尝试发明一个轮子,只是不知道在哪里可以找到它。谢谢!

标签: .net async-await task-parallel-library tpl-dataflow


【解决方案1】:

正如已经建议的那样,您需要使用方便的TPL Dataflow library,有两个块,用于在处理之前存储消息,并对它们进行实际操作:

// storage
var operations = new BufferBlock<LongOperation>();
// no more than 4 actions at the time
var actions = new ActionBlock<LongOperation>(x => x.Execute(),
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });

// consume new operations automatically
operations.LinkTo(actions);
for(var x = 1; x <= 10; ++x)
{
    // blocking sending
    operations.Post(new LongOperation());
    // awaitable send for async operations
    // await operations.SendAsync(new LongOperation());
}

您还可以通过为缓冲区设置 BoundedCapacity 选项来引入一些限制,例如一次不超过 30 个操作。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2014-10-20
    • 2017-06-16
    • 2023-03-27
    • 2012-02-24
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多