【问题标题】:Delayed Post to DataFlow延迟发布到 DataFlow
【发布时间】:2017-08-07 16:58:57
【问题描述】:

将项目发布到TPL DataFlow 时,是否有任何机制可以允许延迟发布?

public partial class BasicDataFlowService
{
    private readonly ActionBlock<string> workerBlock;

    public BasicDataFlowService()
    {
        workerBlock = new ActionBlock<string>(file => DoWork(file), new ExecutionDataflowBlockOptions()
        {
            MaxDegreeOfParallelism = 32
        });
    }

    partial void DoWork(string fileName);

    private void AddToDataFlow(string file)
    {
        workerBlock.Post(file);
    }
}

AddToDataFlow 内,我希望能够在处理项目之前指定延迟(例如,如果我们决定将处理延迟 30 秒)。

我确实考虑过使用TransFormBlocknew System.Threading.ManualResetEvent(false).WaitOne(1000);,例如

var requeueBlock = new TransformBlock<string, string>(file =>
{
    new System.Threading.ManualResetEvent(false).WaitOne(1000);
    return file;
});

requeueBlock.LinkTo(workerBlock);

但是,这似乎会不必要地消耗一个线程,而该线程可能会被链中的其他块使用。

【问题讨论】:

    标签: c# tpl-dataflow dataflow


    【解决方案1】:

    首先,您需要将ManualResetEvent 存储为单例,否则所有线程都会有自己的对象等待,您的方法将不起作用。

    其次,如果您需要在管道中的一个AppDomain 内进行同步,请考虑使用ManualResetEventSlim 版本而不是重度ManualResetEvent

    如果你想重用你机器的内核而不是无用的等待,你应该研究SpinWait轻量级结构。在这种情况下,您可能会发现 Joseph Albahari' article 很有用:

    // singleton variable
    bool _proceed;
    
    var requeueBlock = new TransformBlock<string, string>(file =>
    {
        var spinWait = new SpinWait();
        while (!_proceed)
        {
            // ensure we have the latest _proceed value
            Thread.MemoryBarrier();
            // try to spin for a while
            // after some spins, yield to another thread
            spinWait.SpinOnce();
        }
        return file;
    });
    

    SpinWait 内部决定如何让步:使用Sleep(0)Sleep(1)Yield 方法调用,因此对您的情况非常有效。

    【讨论】:

      【解决方案2】:

      要在向workerBlock 发布值之前添加延迟,您可以简单地插入延迟并在发布值之前等待它。如果您的workerBlock 容量有限,您可以await SendAsync。实现目标的几个选项:

      private async Task AddToDataflow(string file, TimeSpan delay) {
          await Task.Delay(delay);
          await workerBlock.SendAsync(file);
      }
      
      private async Task AddToDataflow(string file) {
          var delay = TimeSpan.FromSeconds(30);
          await Task.Delay(delay);
          await workerBlock.SendAsync(file);
      }
      
      private async void AddToDataflow(string file) {
          var delay = TimeSpan.FromSeconds(30);
          await Task.Delay(delay);
          workerBlock.Post(file);
      }
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2015-09-14
        • 2014-05-14
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多