【问题标题】:TPL FromAsync with TaskScheduler and TaskFactoryTPL FromAsync 与 TaskScheduler 和 TaskFactory
【发布时间】:2012-12-06 15:04:12
【问题描述】:

我正在尝试结合使用TaskFactory.FromAsync 创建一个任务管道/有序调度程序。

我希望能够触发 Web 服务请求(使用 FromAsync 来使用 I/O 完成端口)但保持它们的顺序并且在任何时候只执行一个。

目前我不使用FromAsync,所以我可以使用TaskFactory.StartNew(()=>api.DoSyncWebServiceCall()),并依靠TaskFactory 使用的OrderedTaskScheduler 来确保只有一个请求未完成。

我认为在使用 FromAsync 方法时这种行为会保持不变,但事实并非如此:

TaskFactory<Stuff> taskFactory = new TaskFactory<Stuff>(new OrderedTaskScheduler());
var t1 = taskFactory.FromAsync((a, s) => api.beginGetStuff(a, s), a => api.endGetStuff(a));
var t2 = taskFactory.FromAsync((a, s) => api.beginGetStuff(a, s), a => api.endGetStuff(a));
var t3 = taskFactory.FromAsync((a, s) => api.beginGetStuff(a, s), a => api.endGetStuff(a));

所有这些beginGetStuff 方法都在FromAsync 调用中调用(因此虽然它们是按顺序分派的,但同时发生了n api 调用)。

FromAsync 的重载需要一个 TaskScheduler:

public Task FromAsync(
    IAsyncResult asyncResult,
    Action<IAsyncResult> endMethod,
    TaskCreationOptions creationOptions,
    TaskScheduler scheduler
)

但是文档说:

TaskScheduler 用于调度执行 end 方法的任务。

如您所见,它需要已经构建的IAsyncResult,而不是Func&lt;IAsyncResult&gt;

这需要自定义FromAsync 方法还是我遗漏了什么?任何人都可以建议从哪里开始实施此实施吗?

干杯,

编辑:

我想从调用者那里抽象出这种行为,因此,根据TaskFactory(带有专门的TaskScheduler)的行为,我需要立即返回任务——这个任务不仅会封装@987654339 @Task 也是该任务在等待轮到执行时的排队。

一种可能的解决方案:

class TaskExecutionQueue
{
    private readonly OrderedTaskScheduler _orderedTaskScheduler;
    private readonly TaskFactory _taskFactory;
    public TaskExecutionQueue(OrderedTaskScheduler orderedTaskScheduler)
    {
        _orderedTaskScheduler = orderedTaskScheduler;
        _taskFactory = new TaskFactory(orderedTaskScheduler);

    }

    public Task<TResult> QueueTask<TResult>(Func<Task<TResult>> taskGenerator)
    {
        return _taskFactory.StartNew(taskGenerator).Unwrap();
    }
}

但是,这会在 FromAsync 调用发生时使用一个线程。理想情况下,我不必这样做。

【问题讨论】:

    标签: c# .net parallel-processing task-parallel-library


    【解决方案1】:

    您无法安排 IO 任务,因为它们没有与之关联的线程。 Windows 内核提供无线程 IO 操作。启动这些 IO 不涉及托管代码,并且 TaskScheduler 类不会发挥作用。

    因此,您必须延迟启动 IO,直到您确定确实希望网络受到攻击。您可以使用SemaphoreSlim.WaitAsync 来限制当前运行的任务数量。在启动单个 IO 之前等待该方法的结果并等待它。

    【讨论】:

      【解决方案2】:

      最简单的方法是使用TPL Dataflow

      您可以定义一个“块”来接收异步委托流并一次执行一个(等到每个都完成后再开始下一个):

      var block = new ActionBlock<Func<Task>>(func => func());
      

      然后,触发网络服务请求:

      block.Post(() => Task.Factory.FromAsync(...));
      

      或(我更喜欢):

      block.Post(() => client.GetStuffAsync(a, b, c));
      

      如果您只想执行任务,ActionBlock 方法很好。如果你想产生一个输出流,那么看看TransformBlock

      var block = new TransformBlock<Func<Task<Stuff>>, Stuff>(func => func());
      

      您以同样的方式触发您的请求,您可以通过调用ReceiveReceiveAsync 获得结果。

      【讨论】:

      • 现在我想一想——如果 OP 只想要一个未完成的请求......他为什么不只使用一个循环?如果他能做到这一点,他可能应该更喜欢 TPL 数据流。
      • 这是我的第一个想法,但从他的描述看来,他想要更多的是“一劳永逸”的使用模式。
      • 谢谢斯蒂芬 - 这听起来很有趣,我想看看。抱歉没有让我的问题更清楚,但你是对的,我的用例是我可以立即处理任务的地方,尽管它在执行之前排队(与具有专门任务调度程序的 TaskFactory 给你的行为相同)。我还不确定 TPL Dataflow 是否会启用此功能,但会看看!干杯,
      【解决方案3】:

      我已经决定在这里定制一个解决方案......锁很乱而且不受欢迎,但目前,这可以完成我想要的工作。

      public interface ITaskExecutionQueue
      {
          Task<TResult> QueueTask<TResult>(Func<Task<TResult>> taskGenerator);
          Task<TResult> QueueTask<TResult>(Task<Task<TResult>> taskGenerator);
          int OutstandingTaskCount { get; }
          event EventHandler OutstandingTaskCountChanged;
      }
      
      /// This class ensures that only a single Task is executed at any one time.  They are executed sequentially in order being queued.
      /// The advantages of this class over OrderedTaskScheduler is that you can use any type of Task such as FromAsync (I/O Completion ports) 
      /// which are not able to be scheduled using a traditional TaskScheduler.
      /// Ensure that the `outer` tasks you queue are unstarted.  E.g. <![CDATA[
      /// _taskExeQueue.QueueTask(new Task<Task<TResult>>(() => StartMyRealTask()));
      /// ]]>
      class OrderedTaskExecutionQueue : ITaskExecutionQueue
      {
          private readonly Queue<Task> _queuedTasks = new Queue<Task>();
          private Task _currentTask;
          private readonly object _lockSync = new object();
      
          /// <summary>
          /// Queues a task for execution
          /// </summary>
          /// <typeparam name="TResult"></typeparam>
          /// <param name="taskGenerator">An unstarted Task that creates your started real-work task</param>
          /// <returns></returns>
          public Task<TResult> QueueTask<TResult>(Func<Task<TResult>> taskGenerator)
          {
              return QueueTask(new Task<Task<TResult>>(taskGenerator));
          }
      
          public Task<TResult> QueueTask<TResult>(Task<Task<TResult>> taskGenerator)
          {
              Task<TResult> unwrapped = taskGenerator.Unwrap();
              unwrapped.ContinueWith(_ =>
                                     {
                                         EndTask();
                                         StartNextTaskIfQueued();
                                     }, TaskContinuationOptions.ExecuteSynchronously);
      
              lock (_lockSync)
              {
                  _queuedTasks.Enqueue(taskGenerator);
      
                  if (_currentTask == null)
                  {
                      StartNextTaskIfQueued();
                  }
              }
      
              TaskCompletionSource<TResult> tcs = new TaskCompletionSource<TResult>();
              tcs.TrySetFromTaskIncomplete(unwrapped);
      
              OutstandingTaskCountChanged.Raise(this);
      
              return tcs.Task;
          }
      
          private void EndTask()
          {
              lock (_lockSync)
              {
                  _currentTask = null;
                  _queuedTasks.Dequeue();
              }
      
              OutstandingTaskCountChanged.Raise(this);
          }
      
          private void StartNextTaskIfQueued()
          {
              lock (_lockSync)
              {
                  if (_queuedTasks.Count > 0)
                  {
                      _currentTask = _queuedTasks.Peek();
      
                      _currentTask.RunSynchronously();
                  }
              }
          }
      
          /// <summary>
          /// Includes the currently executing task.
          /// </summary>
          public int OutstandingTaskCount
          {
              get
              {
                  lock (_lockSync)
                  {
                      return _queuedTasks.Count;
                  }
              }
          }
      
          public event EventHandler OutstandingTaskCountChanged;
      }
      

      接收未启动的Task&lt;Task&lt;TResult&gt;&gt; - 这允许队列决定何时执行它并开始FromAsync 调用(这是内部任务)。用法:

      Task<Task<TResult>> queueTask = new Task<Task<TResult>>(() => Task.Factory.FromAsync(beginAction, endAction));
      Task<TResult> asyncCallTask = _taskExecutionQueue.QueueTask(queueTask);
      

      【讨论】:

        猜你喜欢
        • 2013-05-15
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2014-04-25
        • 2020-12-23
        相关资源
        最近更新 更多