【问题标题】:Always Running Threads on Windows Service始终在 Windows 服务上运行线程
【发布时间】:2014-07-28 18:23:51
【问题描述】:

我正在编写一个 Windows 服务,它将启动多个工作线程,这些线程将侦听 Amazon SQS 队列并处理消息。大约有 20 个线程监听 10 个队列。

线程必须始终运行,这就是为什么我倾向于将实际线程用于工作循环而不是线程池线程。

这是一个顶级实现。 Windows 服务将启动多个工作线程,每个工作线程都会监听它的队列并处理消息。

protected override void OnStart(string[] args)
{
     for (int i = 0; i < _workers; i++)
   {
      new Thread(RunWorker).Start();
   }
}

这里是工作的实现

public async void RunWorker()
{
  while(true)
  {
    // .. get message from amazon sqs sync.. about 20ms
    var message = sqsClient.ReceiveMessage();

    try
    {
       await PerformWebRequestAsync(message);
       await InsertIntoDbAsync(message);
    }
    catch(SomeExeception)
    {
       // ... log
       //continue to retry
       continue;
    }
    sqsClient.DeleteMessage();
  }
}

我知道我可以使用 Task.Run 执行相同的操作并在线程池线程上执行它,而不是启动单个线程,但我看不出这样做的原因,因为每个线程都会一直在运行。

您是否发现此实现有任何问题?让线程始终以这种方式运行有多可靠,我可以做些什么来确保每个线程始终在运行?

【问题讨论】:

    标签: c# multithreading windows-services async-await threadpool


    【解决方案1】:

    您现有解决方案的一个问题是您以一种即发即弃的方式调用您的RunWorker,尽管是在一个新线程上(即new Thread(RunWorker).Start())。

    RunWorker 是一个async 方法,当执行点到达第一个await(即await PerformWebRequestAsync(message))时,它会返回给调用者。如果PerformWebRequestAsync 返回一个挂起的任务,RunWorker 返回并且你刚刚启动的新线程终止。

    我认为您根本不需要一个新线程,只需使用AmazonSQSClient.ReceiveMessageAsyncawait 其结果即可。另一件事是你不应该使用async void 方法,除非你真的不关心跟踪异步任务的状态。请改用async Task

    您的代码可能如下所示:

    List<Task> _workers = new List<Task>();
    CancellationTokenSource _cts = new CancellationTokenSource();
    
    protected override void OnStart(string[] args)
    {
      for (int i = 0; i < _MAX_WORKERS; i++)
      {
        _workers.Add(RunWorkerAsync(_cts.Token)); 
      }
    }
    
    public async Task RunWorkerAsync(CancellationToken token)
    {
      while(true)
      {
        token.ThrowIfCancellationRequested();
    
        // .. get message from amazon sqs sync.. about 20ms
        var message = await sqsClient.ReceiveMessageAsync().ConfigureAwait(false);
    
        try
        {
           await PerformWebRequestAsync(message);
           await InsertIntoDbAsync(message);
        }
        catch(SomeExeception)
        {
           // ... log
           //continue to retry
           continue;
        }
        sqsClient.DeleteMessage();
      }
    }
    

    现在,要停止所有待处理的工作人员,您可以简单地执行以下操作(从主“请求调度程序”线程):

    _cts.Cancel();
    try
    {
        Task.WaitAll(_workers.ToArray()); 
    }
    catch (AggregateException ex) 
    {
        ex.Handle(inner => inner is OperationCanceledException);
    }
    

    注意,ConfigureAwait(false) 对于 Windows 服务是可选的,因为默认情况下初始线程上没有同步上下文。但是,我会保持这种方式使代码独立于执行环境(对于存在同步上下文的情况)。

    最后,如果由于某种原因你不能使用ReceiveMessageAsync,或者你需要调用另一个阻塞API,或者干脆在RunWorkerAsync开头做一些CPU密集型工作,只要用Task.Run 包裹它(而不是包裹整个RunWorkerAsync):

    var message = await Task.Run(
        () => sqsClient.ReceiveMessage()).ConfigureAwait(false);
    

    【讨论】:

      【解决方案2】:

      好吧,我会使用 CancellationTokenSource 在服务中实例化并传递给工作人员。您的 while 语句将变为:

      while(!cancellationTokenSource.IsCancellationRequested)
      {
        //rest of the code
      }
      

      这样您就可以从OnStop 服务方法中取消所有工作人员。

      另外,你应该注意:

      1. 如果您从线程外部处理线程状态,则可能会抛出 ThreadStateExceptionThreadInterruptedException 或其他其中之一。因此,您需要处理正确的线程重启。
      2. worker 是否需要在迭代之间不间断地运行?我会在那里睡觉(甚至几毫秒),这样他们就不会让 CPU 无所事事。
      3. 如果发生,您需要处理ThreadStartException 并重新启动worker。

      除此之外,没有任何理由说明这 10 个踏板不能在服务运行期间(一次数天、数周、数月)持续运行。

      【讨论】:

      • 感谢您的回答,您建议如何正确重启线程?
      • @Sergey:我在想,在异常处理代码中,您只需要首先通知启动线程的组件,然后清理它(如果有的话)并启动另一个。跨度>
      猜你喜欢
      • 2020-02-05
      • 1970-01-01
      • 2020-04-29
      • 1970-01-01
      • 1970-01-01
      • 2011-02-03
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多