【问题标题】:Parallel tasks vs delegates并行任务与委托
【发布时间】:2013-01-16 20:35:01
【问题描述】:

我现在需要决定如何构建我的异步代码。 我需要从我的函数 Func1() 一次调用 20 个不同的 Web 服务,当它们都返回一个 xml 答案时,将所有结果加入一个大 xml。

我考虑过使用 TPL 任务。像这样:

var task = Task.Factory.StartNew (call the web service1...);
var task2 = Task.Factory.StartNew (call the web service2...);
var task3 = Task.Factory.StartNew (call the web service3...);

task.WaitAll();

听起来不错还是有更好的方法来完成工作?

【问题讨论】:

  • 是的,有一种更好的方法 - 使调用异步 - 现在你并行执行它们,但在等待结果时阻塞多达 3(或 20 个?)线程 - 如果你进行异步调用不会阻止任何
  • 我不会说async 一定更好,而是不同,它的优越性取决于具体情况。在这种情况下,“调用 Web 服务”可能是一个相当轻量级的操作,因此线程 (Task.Factory.StartNew) 的开销可能是不合理的。
  • 根据问题,xml 的组合应该作为输出,所以调用必须等到每个请求都应该完成。在最大情况下,所有进程都可以推送到后台线程。因为它已经是平行的,所以这部分没有什么可做的。我不认为完整的 asyc 会在这里工作。

标签: c# c#-4.0 task-parallel-library


【解决方案1】:

几个月前我们需要这样的东西来同时处理多个远程 URL。我们通过从SemaphoreSlim 类派生我们自己的类来实现这一点。

你可以这样实现:

/// <summary>
    /// Can be used to process multiple URL's concurrently.
    /// </summary>
    public class ConcurrentUrlProcessor : SemaphoreSlim
    {
        private int initialCount;
        private int maxCount;
        private readonly HttpClient httpClient;

        /// <summary>
        /// Initializes a new instance of the <see cref="T:System.Threading.SemaphoreSlim" /> class, specifying the initial number of requests that can be granted concurrently.
        /// </summary>
        /// <param name="initialCount">The initial number of requests for the semaphore that can be granted concurrently.</param>
        public ConcurrentUrlProcessor(int initialCount)
            :base(initialCount)
        {
            this.initialCount = initialCount;
            this.maxCount = int.MaxValue;
            this.httpClient = new HttpClient();
        }
        /// <summary>
        /// Initializes a new instance of the <see cref="T:System.Threading.SemaphoreSlim" /> class, specifying the initial and maximum number of requests that can be granted concurrently.
        /// </summary>
        /// <param name="initialCount">The initial number of requests for the semaphore that can be granted concurrently.</param>
        /// <param name="maxCount">The maximum number of requests for the semaphore that can be granted concurrently.</param>
        public ConcurrentUrlProcessor(int initialCount, int maxCount)
            : base(initialCount, maxCount) 
        {
            this.initialCount = initialCount;
            this.maxCount = maxCount;
            this.httpClient = new HttpClient();
        }
        /// <summary>
        /// Starts the processing.
        /// </summary>
        /// <param name="urls">The urls.</param>
        /// <returns>Task{IEnumerable{XDocument}}.</returns>
        public virtual async Task<IEnumerable<XDocument>> StartProcessing(params string[] urls)
        {
            List<Task> tasks = new List<Task>();
            List<XDocument> documents = new List<XDocument>();

            SemaphoreSlim throttler = new SemaphoreSlim(initialCount, maxCount);
            foreach (string url in urls)
            {
                await throttler.WaitAsync();

                tasks.Add(Task.Run(async () =>
                {
                    try
                    {
                        string xml = await this.httpClient.GetStringAsync(url);

                        //move on to the next page if no xml is returned.
                        if (string.IsNullOrWhiteSpace(xml))
                            return;

                        var document = XDocument.Parse(xml);
                        documents.Add(document);
                    }
                    catch (Exception)
                    {
                        //TODO: log the error or do something with it.
                    }
                    finally
                    {
                        throttler.Release();
                    }
                }));
            }

            await Task.WhenAll(tasks);

            return documents;
        }
    }

以及相应的单元测试:

    [Test]
    public async void CanProcessMultipleUrlsTest()
    {
        string[] urls = new string[] 
        {
            "http://google.nl",
            "http://facebook.com",
            "http://linkedin.com",
            "http://twitter.com" 
        };

        IEnumerable<XDocument> documents = null;
        ConcurrentUrlProcessor processor = new ConcurrentUrlProcessor(100);

        documents = await processor.StartProcessing(urls);

        Assert.AreEqual(4, documents.Count());
    }

【讨论】:

    【解决方案2】:

    我想到了两种方法。

    一个。您现在的方式是使用延续 ContinueWith/ContinueWhenAll,如 this answer 和此 articale 中所述。因此,对于您的情况,您可能会使用子任务的单个延续,所以

    TaskCreationoptions op = TaskCreationOptions.AttachedToParent;
    Task.Factory.StartNew(() => 
    {
        var task1 = Task.Factory.StartNew (CallService(1));
        var task2 = Task.Factory.StartNew (CallService(2));
        var task3 = Task.Factory.StartNew (CallService(3));
    })
    .ContinueWith(ant => { SomeOtherselegate });
    

    或者您可以按照here 的说明链接延续。

    另一种方法是使用ContinueWhenAll

    var task1 = Task.Factory.StartNew (CallService(1));
    var task2 = Task.Factory.StartNew (CallService(2));
    var task3 = Task.Factory.StartNew (CallService(3));
    var continuation = Task.Factory.ContinueWhenAll(
        new[] { task1, task2, task3 }, tasks => Console.WriteLine("Done!"));
    

    这里唯一要考虑的是你可以有可变数量的任务,但这很容易,我会让你解决这个问题。

    B。另一种方法是使用.NET4.5+ 和async/await。所以你的代码会像

    private async void CallAllServicesAsync()
    {
        await CallServiceAsync(1);
        await CallServiceAsync(2);
        await CallServiceAsync(3);
    }
    

    在哪里

    private Task CallServiceAsync(int serviceNumber)
    {
        return Task.Run(() = > { SomeMethod(); });
    }
    

    上面的代码相当于显示的第一个代码,但框架会在后台为您处理所有事情。

    我希望这会有所帮助。

    【讨论】:

      【解决方案3】:

      我在这里只能想到两种主要方法:

      1. 有一个地方可以在收到它后立即聚合。这可以通过 ContinueWith() 方法来完成。但是,您需要在聚合代码中处理同步,最后您仍然需要等到所有任务完成。因此,只有在聚合需要很长时间并且可以并行完成时,这种方法才有意义。

      2. 你做的方式 - 又好又简单:)

      至于使用 async,我会投票赞成使用TaskFactory.ContinueWhenAll() 方法。这样你就不会阻塞任何线程,代码看起来比有多个等待更好(取决于口味)并且可能会有更少的开销(可能取决于实现)。

      【讨论】:

        【解决方案4】:

        使用async 的正确方法是首先定义一个自然异步的CallServiceAsync(即使用HttpClient 或者是TaskFactory.FromAsync 围绕Begin/End 方法的包装器)。它应该使用Task.Run

        一旦你有一个自然异步的CallServiceAsync,那么你可以同时发出 20 个调用并(异步地)等待它们:

        public async Task<XDocument> GetAllDataAsync()
        {
          var task1 = CallServiceAsync(1);
          var task2 = CallServiceAsync(2);
          var task3 = CallServiceAsync(3);
          ...
          XDocument[] results = await task.WhenAll(task1, task2, task3, ...);
          return JoinXmlDocuments(results);
        }
        

        这种方法根本不会阻塞任何线程。

        您可以通过使用ConfigureAwait(false) 来提高性能:

        XDocument[] results = await task.WhenAll(task1, task2, task3, ...).ConfigureAwait(false);
        

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 1970-01-01
          • 2010-11-10
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2011-01-12
          相关资源
          最近更新 更多