【问题标题】:Test run stops when doing several multi-threaded tests in a row连续执行多个多线程测试时测试运行停止
【发布时间】:2020-07-06 11:36:33
【问题描述】:

我有一个带有静态 ConcurrentQueue 的类。一个类接收消息并将它们放入队列中,而该类中的另一个线程从该队列中读取它们并一次处理它们。该方法因取消令牌而中止。

清空队列的方法如下:

public async Task HandleEventsFromQueueAsync(CancellationToken ct, int pollDelay = 25)
{
    while (true)
    {
        if (ct.IsCancellationRequested)
        {
            return;
        }

        if(messageQueue.TryDequeue(out ConsumeContext newMessage))
        {
            handler.Handle(newMessage);
        }

        try
        {
            await Task.Delay(pollDelay, ct).ConfigureAwait(true);
        } 
        catch (TaskCanceledException)
        {
            return;
        }
    }
}

我的测试方法如下:

CancellationToken ct = source.Token;
Thread thread = new Thread(async () => await sut.HandleEventsFromQueueAsync(ct));
thread.Start();

EventListener.messageQueue.Enqueue(message1);
EventListener.messageQueue.Enqueue(message2);
await Task.Delay(1000);
source.Cancel(false);

mockedHandler.Verify(x => x.Handle(It.IsAny<ConsumeContext>()), Times.Exactly(2));

所以我在自己的线程中开始我的出队方法,并使用新的取消令牌。然后我将几条消息排入队列,给进程一秒钟来处理它们,然后使用 source.Cancel(false) 结束线程并使方法返回。然后我检查处理程序的调用次数是否正确。当然,我正在用不同的消息类型和不同的时间中止出队方法来测试它。

问题是,当我单独运行任何测试时,它们都成功了。但是当我尝试将它们作为一个组运行时,Visual Studio 不会运行每个测试。没有错误消息,并且它运行的测试正常成功,但运行在第二次测试后停止。

我不知道为什么会发生这种情况。我的测试在结构上都是相同的。我每次都正确中止出队线程。

什么可以迫使 Visual Studio 停止测试运​​行而不引发任何类型的错误?

【问题讨论】:

  • 所有测试都使用相同的CancellationTokenSource 吗?不包括 CancellationTokenSource source 变量的声明。
  • 源在 TestInitialize 方法中初始化。 source = new CancellationTokenSource(); 只是在类范围内声明。
  • 那么,如果您在一个测试中取消source.Token,您将取消所有测试,因为它是同一个令牌。每次测试都需要不同的令牌
  • @mortb 我正在为每个测试创建一个新来源和一个新令牌。只是在 TestInitialize 中声明了源(在每次测试之前运行),并且每次在测试本身中都会创建令牌,但对于每个测试来说两者都应该是新鲜的..
  • 较新的代码往往不使用ThreadTask.Run(...) *.com/a/13429164/1257728 当您编写Thread(async () =&gt; await sut.HandleEventsFromQueueAsync(ct)); 时调用HandleEventsFromQueueAsync async void 这将使您的进程崩溃,但由于代码被称为“async void”异常未被调用代码*.com/a/45448104/1257728

标签: c# multithreading visual-studio unit-testing test-runner


【解决方案1】:

您将异步 lambda 传递给 Thread 构造函数。 Thread 构造函数不理解异步委托(不接受 Func&lt;Task&gt; 参数),所以你最终得到一个 async void lambda。 Async void 方法 should be avoided 用于任何不是事件处理程序的东西。在您的情况下发生的情况是,当代码到达第一个 await 时,显式创建的线程被终止,而主体的其余部分在 ThreadPool 线程中运行。似乎代码永远不会因异常而失败,否则进程会崩溃(这是 async void 方法的默认行为)。

建议:

  1. 使用Task 而不是Thread。这样您就可以在退出测试之前向await 发送消息。
CancellationToken ct = source.Token;
Task consumerTask = Task.Run(() => sut.HandleEventsFromQueueAsync(ct));

EventListener.messageQueue.Enqueue(message1);
EventListener.messageQueue.Enqueue(message2);
await Task.Delay(1000);
source.Cancel(false);
await consumerTask; // Wait the task to complete

mockedHandler.Verify(x => x.Handle(It.IsAny<ConsumeContext>()), Times.Exactly(2));
  1. 考虑使用BlockingCollection 或异步队列,如Channel,而不是ConcurrentQueue。轮询是一种笨拙且低效的技术。使用阻塞或异步队列,您将不必执行循环等待新消息到达。您将能够进入waiting 状态,并在收到新消息时立即通知。

  2. 使用ConfigureAwait(false) 配置等待。 ConfigureAwait(true) 是默认值,什么都不做。

  3. 考虑通过抛出 OperationCanceledException 来传播取消。这是 .NET 中传播取消的standard way。所以而不是:

if (ct.IsCancellationRequested) return;

...最好这样做:

ct.ThrowIfCancellationRequested();

【讨论】:

  • 感谢您的帮助,以及所有的建议。我使用 ConcurrentQueue 的原因是因为我需要我的生产者知道他们的消息何时被消费。我还没有研究 BlockingCollection,但据我所知,Channels 不允许这种行为。
  • @KeizerHarm 消息被消费后如何通知生产者?通过定期枚举队列?这也将是非常低效的。在生产者和消费者之间建立通信的更好方法可能是在每条消息中传递TaskCompletionSource。生产者发送完消息后awaitTaskCompletionSource.Task属性完成,消费者调用TaskCompletionSource.SetResult方法发送消息已被消费的信号。
  • 非常感谢;听起来这可能是我需要的!我确实也在从另一边轮询队列。性能并不是很重要;无论如何,我不希望每分钟收到超过一条消息,但从概念上讲,这样一个实体听起来更合适,所以我会考虑使用它。
  • @KeizerHarm 我的荣幸!小提示:在构造TaskCompletionSource 时,您可以考虑将RunContinuationsAsynchronously 选项传递给构造函数。这通常会使事情变得更简单,因为它分离了信号的发送和接收。没有它,在消费者站点调用SetResult 的同一线程将立即继续在生产者站点运行Taskawait 之后的代码,然后返回运行SetResult 之后的代码在消费者的网站上。
【解决方案2】:

我已经解决了我自己的问题。结果是新创建的线程抛出了异常,当线程抛出异常时,这些异常被忽略,但它们仍然阻止单元测试的发生。解决导致异常的问题后,测试工作正常。

【讨论】:

  • 当你在那里时删除ConfigureAwait(true)。它没有做任何事情(有用)。
最近更新 更多