【问题标题】:Asynchronous task processor异步任务处理器
【发布时间】:2020-04-24 11:20:13
【问题描述】:

我正在开发一个异步任务处理器。我需要高性能处理器,所以使用的同步原语应该尽可能低级。处理器应该持有一个线程,在没有任务时休眠,在任务出现时唤醒。任务处理和任务添加应该在不同的线程中执行。

我尝试使用AutoResetEvent 实现,但它有竞争条件:

public class Processor
{
    ConcurrentQueue<Action> _workItemQueue = new ConcurrentQueue<Action>();
    AutoResetEvent _newWorkItemAutoResetEvent = new AutoResetEvent(false);
    private bool _disposed;
    Thread _thread;

    public void Do(Action action)
    {
        _workItemQueue.Enqueue(action);
        _newWorkItemAutoResetEvent.Set();
    }

    public Processor()
    {
        _workerThread = new Thread(() =>
        {
            while (!_disposed)
            {
                _newWorkItemAutoResetEvent.WaitOne(); // 
                while (_workItemQueue.TryDequeue(out Action action))
                {
                    action();
                }
                // at this "bad" moment another thread calls Do method. 
                // New action has been enqueued, but when we call
                // _newWorkIteManualAutoEvent.WaitOne() we fall asleep.
            }
        });
        _thread.Start();
    }
}

然后我尝试使用ManualResetEvent

public class Processor
{
    ConcurrentQueue<Action> _workItemQueue = new ConcurrentQueue<Action>();
    ManualResetEventSlim _newWorkItemManualResetEvent = new ManualResetEventSlim(false);
    private bool _disposed;
    Thread _thread;

    public void Do(Action action)
    {
        _workItemQueue.Enqueue(action);
        _newWorkItemManualResetEvent.Set();
    }

    public Processor()
    {
        _workerThread = new Thread(() =>
        {
            while (!_disposed)
            {
                _newWorkItemManualResetEvent.WaitOne();
                _newWorkItemManualResetEvent.Reset();

                while (_workItemQueue.TryDequeue(out Action action))
                {
                    action();
                }
            }
        });
        _thread.Start();
    }
}

我没有看到使用 ManualResetEvent 实现的任何竞争条件。

问题:我说的对吗?或者我需要另一个同步原语?我正在考虑 CountupEvent(反向 CountdownEvent)。它在其计数大于零时发出信号,而在其计数等于零时不发出信号。 Countup事件的计数对应于要执行的任务数。

【问题讨论】:

  • 关于您想要改进的工作实施的问题必须在codereview提出。
  • “我需要高性能处理器,所以使用的同步原语应该尽可能低级” - 如果性能很重要你不会使用 C#。 NET 因为 JIT 编译和 GC 引入的延迟。 .NET 的内置异步和线程池功能的性能开销是最小的,并且已经过试验和测试。您自己的代码实际上非常低效,因为您盲目地创建新线程并且线程在 Windows 上很昂贵 - 更不用说您是如何使用 WaitOne() 阻塞这些线程的。你为什么不使用线程池?
  • 这似乎是“并发任务处理器”而不是“异步任务处理器”。
  • @Dai,我只需要 C# 实现,尽可能高性能。在我的实现中,一个处理器只创建一个线程。我不打算创建很多处理器。
  • 调用处理器“异步”的问题是它可能会产生错误的期望。如今,“异步”这个词与Task-based asynchronous pattern 密切相关。人们期望处理器返回 Task 对象,或接受 Func&lt;Task&gt; 委托,或两者兼而有之。

标签: c# multithreading asynchronous


【解决方案1】:

方便的BlockingCollection 将为您处理大部分问题。

类似:

public sealed class Processor : IDisposable
{
    //set a max queue depth to provide back pressure to the request rate
    BlockingCollection<Action> _workItemQueue = new BlockingCollection<Action>(32);
    private bool _disposed = false;
    private Thread _workerThread;
    private CancellationTokenSource _cancelTokenSource = new CancellationTokenSource();

    public void Do(Action action)
    {
        _workItemQueue.Add(action);
    }

    public void Dispose()
    {
        if (!_disposed)
        {
            _workItemQueue.CompleteAdding();
            _cancelTokenSource.Cancel();
            _disposed = true;
        }
    }

    public Processor()
    {
        _workerThread = new Thread(() =>
        {
            while (!_workItemQueue.IsCompleted)
            {
                if (_workItemQueue.TryTake(out Action action, 1000*2,_cancelTokenSource.Token))
                {
                    action();
                }
            }

        });

        _workerThread.Start();
    }
}

【讨论】:

  • 1) _workerThread 会在没有任务的时候一直自旋,消耗 CPU 资源。是不是? 2) 你确定你的 BlockingCollection 实现会比我的 ManualResetEvent 实现更快吗?
  • 它不应该更快。它应该是正确的:)。即更简单,更不容易发现难以发现的并发问题。并且每隔一两秒就停止等待以进行簿记具有微不足道的 CPU 成本。 2000 毫秒对于 CPU 来说是永恒的。
  • 不使用IsCompleted+TryTake 组合,枚举集合的更好方法是使用GetConsumingEnumerable
  • 这非常好,尤其是与 Parallel.Foreach 组合时,但我喜欢 TryTake 的原因与@IgorBuchelnikov 反对的原因相同。您可以定期使等待超时并更新遥测、日志记录,并证明线程没有被阻塞或在其中一个工作项上旋转。
  • 这是一个有效的论点。使用IsCompleted+TryTake 的一个小缺点是cancellationToken 插槽现在用于处理器的内部管理,如果您想支持调用者取消,则必须创建一个链接的CTS。诚然,这没什么大不了的。
猜你喜欢
  • 2011-04-11
  • 1970-01-01
  • 2015-07-18
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-07-08
相关资源
最近更新 更多