【问题标题】:How to execute multiple parallel tasks on completion of a prior task如何在完成先前任务后执行多个并行任务
【发布时间】:2017-03-29 23:39:46
【问题描述】:

我有一种情况,我需要调用 Web 服务,并且在成功完成后,对从 Web 服务返回的结果执行多项操作。我已经开发了“有效”的代码——只是不像我想要的那样。具体来说,我想从对 Web 服务的调用中获取结果,并将这些结果传递给要并行执行的多个连续任务,但我目前拥有的是在开始第二个任务之前执行第一个连续任务。

我整理了一个非常简化的示例来说明我目前正在做的事情,希望能帮助说明这种情况。一、实现:

public interface IConfigurationSettings
{
    int? ConfigurationSetting { get; set; }
}

public interface IPrintCommandHandler
{
    System.Threading.Tasks.Task<bool> ExecuteAsync(byte[] reportContent);
}

public interface ISaveCommandHandler
{
    System.Threading.Tasks.Task<bool> ExecuteAsync(byte[] reportContent);
}

public interface IWebService
{
    System.Threading.Tasks.Task<object> RetrieveReportAsync(string searchToken, string reportFormat);
}

public class ReportCommandHandler
{
    private readonly IConfigurationSettings _configurationSettings;
    private readonly IPrintCommandHandler _printCommandHandler;
    private readonly ISaveCommandHandler _saveCommandHandler;
    private readonly IWebService _webService;

    public ReportCommandHandler(IWebService webService, IPrintCommandHandler printCommandHandler, ISaveCommandHandler saveCommandHandler, IConfigurationSettings configurationSettings)
    {
        _webService = webService;
        _printCommandHandler = printCommandHandler;
        _saveCommandHandler = saveCommandHandler;
        _configurationSettings = configurationSettings;
    }

    public async Task<bool> ExecuteAsync(string searchToken)
    {
        var reportTask = _webService.RetrieveReportAsync(searchToken, "PDF");
        var nextStepTasks = new List<Task<bool>>();

        // Run "print" task after report task.
        var printTask = await reportTask.ContinueWith(task => _printCommandHandler.ExecuteAsync((byte[]) task.Result));
        nextStepTasks.Add(printTask);

        // Run "save" task after report task.
        if (_configurationSettings.ConfigurationSetting.HasValue)
        {
            var saveTask = await reportTask.ContinueWith(task => _saveCommandHandler.ExecuteAsync((byte[]) task.Result));
            nextStepTasks.Add(saveTask);
        }

        var reportTaskResult = await Task.WhenAll(nextStepTasks);
        return reportTaskResult.Aggregate(true, (current, result) => current & result);
    }
}

因此,Web 服务(第三方,与我无关)有一个用于执行搜索/查找的端点,如果成功,则返回一个参考号(我在示例中将其称为搜索令牌)。然后,此参考号用于以多种不同格式中的任何一种检索查找结果(使用不同的端点)。

本示例中的IWebService 接口代表我创建的用于管理与Web 服务交互的应用程序服务。实际的实现还有其他方法可以进行查找、ping 等操作。

为了让事情更有趣,其中一个后续任务是必需的(将始终在主要任务之后执行),但另一个后续任务是可选的,执行取决于应用程序中其他地方设置的配置设置。

为了更容易地演示这个问题,我创建了一个单元测试:

public class RhinoMockRepository : IDisposable
{
    private readonly ArrayList _mockObjectRepository;

    public RhinoMockRepository()
    {
        _mockObjectRepository = new ArrayList();
    }

    public T CreateMock<T>() where T : class
    {
        var mock = MockRepository.GenerateMock<T>();
        _mockObjectRepository.Add(mock);
        return mock;
    }

    public T CreateStub<T>() where T : class
    {
        return MockRepository.GenerateStub<T>();
    }

    public void Dispose()
    {
        foreach (var obj in _mockObjectRepository) obj.VerifyAllExpectations();
        _mockObjectRepository.Clear();
    }
}

[TestFixture]
public class TapTest
{
    private const string SearchToken = "F71C8B50-ECD1-4C02-AD3F-6C24F1AF3D9A";

    [Test]
    public void ReportCommandExecutesPrintAndSave()
    {
        using (var repository = new RhinoMockRepository())
        {
            // Arrange
            const string reportContent = "This is a PDF file.";
            var reportContentBytes = System.Text.Encoding.Default.GetBytes(reportContent);

            var retrieveReportResult = System.Threading.Tasks.Task.FromResult<object>(reportContentBytes);
            var webServiceMock = repository.CreateMock<IWebService>();
            webServiceMock.Stub(x => x.RetrieveReportAsync(SearchToken, "PDF")).Return(retrieveReportResult);

            var printCommandHandlerMock = repository.CreateMock<IPrintCommandHandler>();
            var printResult = System.Threading.Tasks.Task.FromResult(true);
            printCommandHandlerMock
                .Expect(x => x.ExecuteAsync(reportContentBytes))
                //.WhenCalled(method => System.Threading.Thread.Sleep(TimeSpan.FromSeconds(2)))
                .Return(printResult);

            var configurationSettingsStub = repository.CreateStub<IConfigurationSettings>();
            configurationSettingsStub.ConfigurationSetting = 10;

            var saveCommandHandlerMock = repository.CreateMock<ISaveCommandHandler>();
            var saveResult = System.Threading.Tasks.Task.FromResult(true);
            saveCommandHandlerMock.Expect(x => x.ExecuteAsync(reportContentBytes)).Return(saveResult);

            // Act
            var reportCommandHandler = new ReportCommandHandler(webServiceMock, printCommandHandlerMock, saveCommandHandlerMock, configurationSettingsStub);
            var result = System.Threading.Tasks.Task
                .Run(async () => await reportCommandHandler.ExecuteAsync(SearchToken))
                .Result;

            // Assert
            Assert.That(result, Is.True);
        }
    }
}

理想情况下,在完成对IWebService 上的RetrieveReportAsync() 的调用后,应同时执行“打印”和“保存”命令处理程序,并收到来自RetrieveReportAsync() 的结果副本。但是,如果在单元测试中对WhenCalled... 的调用未注释,并且在逐步执行ReportCommandHandler.ExecuteAsync() 时,您可以看到“打印”命令在到达“保存”之前执行并完成命令。现在,我知道await 的全部意义在于暂停调用async 方法的执行,直到awaited 代码完成,但我不清楚如何实例化“打印”和“保存”命令(任务)作为“报告”任务的延续,以便在“报告”任务完成时它们都并行执行,然后“报告”命令能够返回基于结果的结果“打印”和“保存”命令(任务)。

【问题讨论】:

    标签: c# .net multithreading asynchronous parallel-processing


    【解决方案1】:

    您的问题实际上涉及解决两个不同的目标:

    1. 如何等待任务?
    2. 如何同时执行另外两个任务?

    我发现在您的代码中混合使用 awaitContinueWith() 令人困惑。我不清楚你为什么这样做。 await 为您做的关键事情之一是自动设置延续,因此您不必显式调用 ContinueWith()。然而,你还是会这样做。

    假设这只是一个错误,由于缺乏对如何实现目标的充分理解,我会这样写你的方法:

    public async Task<bool> ExecuteAsync(string searchToken)
    {
        var reportTaskResult = await _webService.RetrieveReportAsync(searchToken, "PDF");
        var nextStepTasks = new List<Task<bool>>();
    
        // Run "print" task after report task.
        var printTask = _printCommandHandler.ExecuteAsync((byte[]) reportTaskResult);
        nextStepTasks.Add(printTask);
    
        // Run "save" task after report task.
        if (_configurationSettings.ConfigurationSetting.HasValue)
        {
            var saveTask = _saveCommandHandler.ExecuteAsync((byte[]) reportTaskResult);
            nextStepTasks.Add(saveTask);
        }
    
        var reportTaskResult = await Task.WhenAll(nextStepTasks);
        return reportTaskResult.Aggregate(false, (current, result) => current | result);
    }
    

    也就是说,先做await原来的任务。然后你知道它已经完成并得到了结果。那时,继续开始其他任务,将他们的 Task 对象添加到您的列表中,但不会单独等待每个任务。最后,await 整个任务列表。

    【讨论】:

    • 谢谢,这会有所帮助。我想我是通过明确调用ContinueWith() 而偏离了主要任务,而我没有意识到它实际上并不需要。还在"Executing tasks in parallel" 找到了一个有关我将试验的并行性的相关问题。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2023-03-28
    • 2016-05-24
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多