【问题标题】:Proper way to implement a never ending task. (Timers vs Task)执行永无止境的任务的正确方法。 (计时器与任务)
【发布时间】:2012-11-21 15:24:13
【问题描述】:

因此,只要应用程序正在运行或请求取消,我的应用程序就需要几乎连续地执行一个操作(每次运行之间暂停 10 秒左右)。它需要完成的工作可能需要长达 30 秒。

最好使用 System.Timers.Timer 并使用 AutoReset 来确保它在前一个“滴答”完成之前不执行操作。

或者我应该在 LongRunning 模式下使用带有取消标记的一般任务,并在其中有一个常规的无限 while 循环调用操作,在调用之间使用 10 秒 Thread.Sleep 执行工作?至于异步/等待模型,我不确定它在这里是否合适,因为我没有任何工作返回值。

CancellationTokenSource wtoken;
Task task;

void StopWork()
{
    wtoken.Cancel();

    try 
    {
        task.Wait();
    } catch(AggregateException) { }
}

void StartWork()
{
    wtoken = new CancellationTokenSource();

    task = Task.Factory.StartNew(() =>
    {
        while (true)
        {
            wtoken.Token.ThrowIfCancellationRequested();
            DoWork();
            Thread.Sleep(10000);
        }
    }, wtoken, TaskCreationOptions.LongRunning);
}

void DoWork()
{
    // Some work that takes up to 30 seconds but isn't returning anything.
}

或者在使用它的 AutoReset 属性时只使用一个简单的计时器,然后调用 .Stop() 来取消它?

【问题讨论】:

  • 考虑到您要实现的目标,任务似乎有点过头了。 en.wikipedia.org/wiki/KISS_principle。在 OnTick() 开始时停止计时器,检查布尔值以查看您是否应该做任何事情,做工作,完成后重新启动计时器。

标签: c# multithreading timer task-parallel-library .net-4.5


【解决方案1】:

这是我想出的:

  • NeverEndingTask 继承并用您想要做的工作覆盖ExecutionCore 方法。
  • 更改 ExecutionLoopDelayMs 允许您调整循环之间的时间,例如如果您想使用退避算法。
  • Start/Stop 提供同步接口来启动/停止任务。
  • LongRunning 表示每个NeverEndingTask 将获得一个专用线程。
  • 与上述基于ActionBlock 的解决方案不同,此类不会在循环中分配内存。
  • 下面的代码是草图,不一定是生产代码:)

public abstract class NeverEndingTask
{
    // Using a CTS allows NeverEndingTask to "cancel itself"
    private readonly CancellationTokenSource _cts = new CancellationTokenSource();

    protected NeverEndingTask()
    {
         TheNeverEndingTask = new Task(
            () =>
            {
                // Wait to see if we get cancelled...
                while (!_cts.Token.WaitHandle.WaitOne(ExecutionLoopDelayMs))
                {
                    // Otherwise execute our code...
                    ExecutionCore(_cts.Token);
                }
                // If we were cancelled, use the idiomatic way to terminate task
                _cts.Token.ThrowIfCancellationRequested();
            },
            _cts.Token,
            TaskCreationOptions.DenyChildAttach | TaskCreationOptions.LongRunning);

        // Do not forget to observe faulted tasks - for NeverEndingTask faults are probably never desirable
        TheNeverEndingTask.ContinueWith(x =>
        {
            Trace.TraceError(x.Exception.InnerException.Message);
            // Log/Fire Events etc.
        }, TaskContinuationOptions.OnlyOnFaulted);

    }

    protected readonly int ExecutionLoopDelayMs = 0;
    protected Task TheNeverEndingTask;

    public void Start()
    {
       // Should throw if you try to start twice...
       TheNeverEndingTask.Start();
    }

    protected abstract void ExecutionCore(CancellationToken cancellationToken);

    public void Stop()
    {
        // This code should be reentrant...
        _cts.Cancel();
        TheNeverEndingTask.Wait();
    }
}

【讨论】:

    【解决方案2】:

    我发现新的基于任务的界面对于执行此类操作非常简单 - 甚至比使用 Timer 类更容易。

    您可以对示例进行一些小的调整。而不是:

    task = Task.Factory.StartNew(() =>
    {
        while (true)
        {
            wtoken.Token.ThrowIfCancellationRequested();
            DoWork();
            Thread.Sleep(10000);
        }
    }, wtoken, TaskCreationOptions.LongRunning);
    

    你可以这样做:

    task = Task.Run(async () =>  // <- marked async
    {
        while (true)
        {
            DoWork();
            await Task.Delay(10000, wtoken.Token); // <- await with cancellation
        }
    }, wtoken.Token);
    

    这样,如果在Task.Delay 内,取消将立即发生,而不必等待Thread.Sleep 完成。

    另外,使用Task.Delay 而不是Thread.Sleep 意味着您不会在睡眠期间绑定线程。

    如果可以的话,您还可以让DoWork() 接受取消令牌,这样取消的响应速度会更快。

    【讨论】:

    • 如果你使用异步 lambda 作为 Task.Factory.StartNew 的参数,你会得到什么任务 - blogs.msdn.com/b/pfxteam/archive/2011/10/24/10229468.aspx 当你做 task.Wait();请求取消后,您将等待不正确的任务。
    • 是的,这实际上应该是 Task.Run now,它具有正确的重载。
    • 根据http://blogs.msdn.com/b/pfxteam/archive/2011/10/24/10229468.aspx 看起来Task.Run 使用线程池,因此您的示例使用Task.Run 而不是Task.Factory.StartNewTaskCreationOptions.LongRunning 不会做完全相同的事情 - 如果我需要使用LongRunning 选项的任务,我不能像你展示的那样使用Task.Run,还是我错过了什么?
    • @Lumirris:异步/等待的目的是避免在线程执行的整个过程中占用线程(这里,在延迟调用期间,任务不使用线程)。所以使用LongRunning 有点不符合不占用线程的目标。如果你想保证在它自己的线程上运行,你可以使用它,但是在这里你将启动一个大部分时间都在休眠的线程。用例是什么?
    • @Lumirris:你是对的,没有办法指定这个; Task.Run (referencesource.microsoft.com/#mscorlib/system/threading/Tasks/…) 与具有默认选项的 Task.Factory.StartNew (referencesource.microsoft.com/#mscorlib/system/threading/Tasks/…) 基本相同。 (但它确实指定了 DenyChildAttach。)
    【解决方案3】:

    我会为此使用TPL Dataflow(因为您使用的是.NET 4.5,它在内部使用Task)。您可以轻松地创建一个ActionBlock&lt;TInput&gt;,它会在处理完其操作并等待适当的时间后将项目发布给自己。

    首先,创建一个工厂来创建你永无止境的任务:

    ITargetBlock<DateTimeOffset> CreateNeverEndingTask(
        Action<DateTimeOffset> action, CancellationToken cancellationToken)
    {
        // Validate parameters.
        if (action == null) throw new ArgumentNullException("action");
    
        // Declare the block variable, it needs to be captured.
        ActionBlock<DateTimeOffset> block = null;
    
        // Create the block, it will call itself, so
        // you need to separate the declaration and
        // the assignment.
        // Async so you can wait easily when the
        // delay comes.
        block = new ActionBlock<DateTimeOffset>(async now => {
            // Perform the action.
            action(now);
    
            // Wait.
            await Task.Delay(TimeSpan.FromSeconds(10), cancellationToken).
                // Doing this here because synchronization context more than
                // likely *doesn't* need to be captured for the continuation
                // here.  As a matter of fact, that would be downright
                // dangerous.
                ConfigureAwait(false);
    
            // Post the action back to the block.
            block.Post(DateTimeOffset.Now);
        }, new ExecutionDataflowBlockOptions { 
            CancellationToken = cancellationToken
        });
    
        // Return the block.
        return block;
    }
    

    我选择了ActionBlock&lt;TInput&gt; 来获取DateTimeOffset structure;你必须传递一个类型参数,它也可以传递一些有用的状态(如果你愿意,你可以改变状态的性质)。

    另外,请注意ActionBlock&lt;TInput&gt; 默认情况下一次只处理 一个 项,因此您可以保证只处理一个操作(也就是说,您不必处理与reentrancy 一起调用Post extension method 本身)。

    我还将CancellationToken structure 传递给ActionBlock&lt;TInput&gt; 的构造函数和Task.Delay method 调用;如果流程被取消,将在第一时间取消。

    从那里,您可以轻松地重构代码以存储由ActionBlock&lt;TInput&gt; 实现的ITargetBlock&lt;DateTimeoffset&gt; interface(这是表示作为消费者的块的更高级别的抽象,您希望能够通过调用Post扩展方法):

    CancellationTokenSource wtoken;
    ActionBlock<DateTimeOffset> task;
    

    您的StartWork 方法:

    void StartWork()
    {
        // Create the token source.
        wtoken = new CancellationTokenSource();
    
        // Set the task.
        task = CreateNeverEndingTask(now => DoWork(), wtoken.Token);
    
        // Start the task.  Post the time.
        task.Post(DateTimeOffset.Now);
    }
    

    然后是你的StopWork 方法:

    void StopWork()
    {
        // CancellationTokenSource implements IDisposable.
        using (wtoken)
        {
            // Cancel.  This will cancel the task.
            wtoken.Cancel();
        }
    
        // Set everything to null, since the references
        // are on the class level and keeping them around
        // is holding onto invalid state.
        wtoken = null;
        task = null;
    }
    

    为什么要在这里使用 TPL 数据流?几个原因:

    关注点分离

    CreateNeverEndingTask 方法现在可以说是创建“服务”的工厂。您可以控制它何时启动和停止,它是完全独立的。您不必将计时器的状态控制与代码的其他方面交织在一起。您只需创建块、启动它并在完成后停止它。

    更有效地使用线程/任务/资源

    TPL 数据流中块的默认调度程序与Task 相同,即线程池。通过使用ActionBlock&lt;TInput&gt; 来处理您的操作,以及调用Task.Delay,您可以在您实际上没有做任何事情时让出对您正在使用的线程的控制。当然,当您生成将处理延续的新 Task 时,这实际上会导致一些开销,但这应该很小,考虑到您没有在一个紧密的循环中处理它(您在调用之间等待十秒钟) .

    如果DoWork 函数实际上可以等待(即,它返回一个Task),那么您可以(可能)通过调整上面的工厂方法来进一步优化它以取而代之的是Func&lt;DateTimeOffset, CancellationToken, Task&gt; Action&lt;DateTimeOffset&gt;,像这样:

    ITargetBlock<DateTimeOffset> CreateNeverEndingTask(
        Func<DateTimeOffset, CancellationToken, Task> action, 
        CancellationToken cancellationToken)
    {
        // Validate parameters.
        if (action == null) throw new ArgumentNullException("action");
    
        // Declare the block variable, it needs to be captured.
        ActionBlock<DateTimeOffset> block = null;
    
        // Create the block, it will call itself, so
        // you need to separate the declaration and
        // the assignment.
        // Async so you can wait easily when the
        // delay comes.
        block = new ActionBlock<DateTimeOffset>(async now => {
            // Perform the action.  Wait on the result.
            await action(now, cancellationToken).
                // Doing this here because synchronization context more than
                // likely *doesn't* need to be captured for the continuation
                // here.  As a matter of fact, that would be downright
                // dangerous.
                ConfigureAwait(false);
    
            // Wait.
            await Task.Delay(TimeSpan.FromSeconds(10), cancellationToken).
                // Same as above.
                ConfigureAwait(false);
    
            // Post the action back to the block.
            block.Post(DateTimeOffset.Now);
        }, new ExecutionDataflowBlockOptions { 
            CancellationToken = cancellationToken
        });
    
        // Return the block.
        return block;
    }
    

    当然,最好将CancellationToken 编织到您的方法(如果它接受一个),这在此处完成。

    这意味着您将拥有一个带有以下签名的DoWorkAsync 方法:

    Task DoWorkAsync(CancellationToken cancellationToken);
    

    您必须更改(只是轻微地,并且您不会在这里放弃关注点分离)StartWork 方法来解释传递给 CreateNeverEndingTask 方法的新签名,如下所示:

    void StartWork()
    {
        // Create the token source.
        wtoken = new CancellationTokenSource();
    
        // Set the task.
        task = CreateNeverEndingTask((now, ct) => DoWorkAsync(ct), wtoken.Token);
    
        // Start the task.  Post the time.
        task.Post(DateTimeOffset.Now, wtoken.Token);
    }
    

    【讨论】:

    • 你好,我正在尝试这个实现,但我遇到了问题。如果我的 DoWork 没有参数,task = CreateNeverEndingTask(now => DoWork(), wtoken.Token);给我一个构建错误(类型不匹配)。另一方面,如果我的 DoWork 采用 DateTimeOffset 参数,那么同一行会给我一个不同的构建错误,告诉我 DoWork 的重载不接受 0 个参数。你能帮我解决这个问题吗?
    • 实际上,我通过在分配任务的行中添加强制转换并将参数传递给 DoWork 解决了我的问题: task = (ActionBlock)CreateNeverEndingTask(now => DoWork(now), wtoken.Token);
    • 您也可以更改“ActionBlock 任务的类型;”到 ITargetBlock 任务;
    • 我相信这很可能会永远分配内存,从而最终导致溢出。
    • @NateGardner 在哪一部分?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-12-29
    • 2021-05-05
    • 2011-11-01
    • 2017-11-04
    • 2018-09-28
    • 1970-01-01
    相关资源
    最近更新 更多