【问题标题】:Let the tasks race让任务竞赛
【发布时间】:2014-06-06 22:55:38
【问题描述】:

假设我有一个 BlockingCollection OutputQueue,它有很多项目。目前我的代码是:

    public void Consumer()
    {
        foreach (var workItem in OutputQueue.GetConsumingEnumerable())
        {
            PlayMessage(workItem);
            Console.WriteLine("Works on {0}", workItem.TaskID);
            OutLog.Write("Works on {0}", workItem.TaskID);
            Thread.Sleep(500);
        }
    }

现在我希望PlayMessage(workItem) 以多任务方式运行,因为一些 workItem 需要更多时间,其他需要更少时间。差别很大。

至于方法PlayMessage(workItem),它有一些服务调用,播放文本到语音和一些日志记录。

 bool successRouting = serviceCollection.SvcCall_GetRoutingData(string[] params, out ex);
 bool successDialingService = serviceCollection.SvcCall_GetDialingServiceData(string[] params, out excep);
 PlayTTS(workItem.TaskType); //  playing text to speech

那么如何更改我的代码?

我的想法是:

public async Task Consumer()
{
    foreach (var workItem in OutputQueue.GetConsumingEnumerable())
    {
        await PlayMessage(workItem);
        Console.WriteLine("Works on {0}", workItem.TaskID);
        OutLog.Write("Works on {0}", workItem.TaskID);
        Thread.Sleep(500);
    }
}

【问题讨论】:

  • 澄清一下,您希望多个 TTS 操作同时进行吗?所以不同的线程会同时说不同的事情?
  • 是的,有可能。这是一个电话应用程序。我们的客户在美国的不同地点。只要我们有足够的端口,同时玩TTS就可以了。比方说 AT&T,客户可以同时使用服务。

标签: c#-4.0 task-parallel-library async-await c#-5.0


【解决方案1】:

由于您希望 PlayMessage 具有并行性,我建议您查看TPL Dataflow,因为它结合了并行工作和异步,因此您可以正常等待您的工作。

TPL Dataflow 由 Blocks 构成,每个 Block 都有自己的特点。 一些受欢迎的有:

ActionBlock<TInput>
TransformBlock<T, TResult>

我会构造如下内容:

var workItemBlock = new ActionBlock<WorkItem>(
    workItem =>
    {
        PlayMessage(workItem);
        Console.WriteLine("Works on {0}", workItem.TaskID);
        OutLog.Write("Works on {0}", workItem.TaskID);
    }, new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = // Set max parallelism as you wish..
    });

foreach (var workItem in OutputQueue.GetConsumingEnumerable())
{
    workItemBlock.Post(workItem);
}

workItemBlock.Complete();

【讨论】:

  • 如果我不想并行,我只需要异步。代码如何?我不确定我们是否可以在 PlayMessage 中应用异步,因为它有服务调用 serviceCollection.SvcCall_GetRoutingData 等?
  • 为了调用异步方法,它必须是异步的:)。我不确定我是否了解您的需求。此代码示例将在不使用异步的情况下并行加载您的工作。
  • 根据您的问题,您说您想使用“多个任务”来并行调用 PlayMessage。这就是代码的作用。
  • 我的意思是如果 MaxDegreeOfParallelism=1,我们也可以在 PlayMessage 中等待 serviceCollection.SvcCall_GetRoutingData 吗?可能我的表述不清楚,抱歉。
  • 如果将 MaxDegreeOfParllelism 设置为 1,只需像在普通 foreach 循环中那样运行代码,这不是并行的。只有当您的 API 公开了一个异步端点时,您才能等待 PlayMessage,在这种情况下我认为它没有,所以我猜您将无法等待您的服务调用。
【解决方案2】:

这是另一种解决方案,不是基于 TPL 数据流。它使用 SemaphoreSlim 来限制并行播放的数量(警告,未经测试):

public async Task Consumer()
{
    var semaphore = new SemaphoreSlim(NUMBER_OF_PORTS);
    var pendingTasks = new HashSet<Task>();
    var syncLock = new Object();

    Action<Task> queueTaskAsync = async(task) =>
    {
        // be careful with exceptions inside "async void" methods

        // keep failed/cancelled tasks in the list
        // they will be observed outside
        lock (syncLock)
            pendingTasks.Add(task);

        await semaphore.WaitAsync().ConfigureAwait(false);
        try
        {
            await task;
        }
        catch
        {
            if (!task.IsCancelled && !task.IsFaulted)
                throw;
            // the error will be observed later, 
            // keep the task in the list
            return;
        }
        finally
        {
            semaphore.Release();
        }

        // remove successfully completed task from the list
        lock (syncLock)
            pendingTasks.Remove(task);
    };

    foreach (var workItem in OutputQueue.GetConsumingEnumerable())
    {
        var item = workItem;
        Func<Task> workAsync = async () =>
        {
            await PlayMessage(item);
            Console.WriteLine("Works on {0}", item.TaskID);
            OutLog.Write("Works on {0}", item.TaskID);
            Thread.Sleep(500);
        });

        var task = workAsync();
        queueTaskAsync(task);
    }

    await Task.WhenAll(pendingTasks.ToArray());
}

【讨论】:

  • 还有Cannot convert lambda expression to type "System.Threading.Tasks.Task",在queueTaskAsync(async()=&gt;
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2012-07-12
  • 1970-01-01
  • 1970-01-01
  • 2022-11-02
相关资源
最近更新 更多