【问题标题】:Run x number of web requests simultaneously同时运行 x 个 Web 请求
【发布时间】:2011-10-24 16:12:11
【问题描述】:

我们公司有一个 Web 服务,我想通过我自己的 C# 中的 HTTPWebRequest 客户端发送 XML 文件(存储在我的驱动器上)。这已经奏效了。 Web 服务同时支持 5 个同步请求(一旦服务器上的处理完成,我会收到来自 Web 服务的响应)。每个请求的处理大约需要 5 分钟。

抛出太多请求 (> 5) 会导致我的客户端超时。此外,这可能会导致服务器端出现错误和数据不连贯。在服务器端进行更改不是一种选择(来自不同的供应商)。

现在,我的 Webrequest 客户端将发送 XML 并使用 result.AsyncWaitHandle.WaitOne(); 等待响应

但是,这种方式一次只能处理一个请求,尽管 Web 服务支持 5 个。我尝试使用 BackgroundworkerThreadpool 但它们同时创建了太多请求,这对我来说毫无用处.任何建议,如何解决这个问题?创建我自己的 Threadpool 正好有 5 个线程?有什么建议,如何实现?

【问题讨论】:

    标签: c# multithreading asynchronous webrequest


    【解决方案1】:

    简单的方法是创建 5 个线程(除此之外:这是一个奇数!),它们使用来自 BlockingCollection 的 xml 文件。

    类似:

    var bc = new BlockingCollection<string>();
    
    for ( int i = 0 ; i < 5 ; i++ )
    {
        new Thread( () =>
            {
                foreach ( var xml in bc.GetConsumingEnumerable() )
                {
                    // do work
                }
            }
        ).Start();
    }
    
    bc.Add( xml_1 );
    bc.Add( xml_2 );
    ...
    bc.CompleteAdding(); // threads will end when queue is exhausted
    

    【讨论】:

      【解决方案2】:

      如果您使用的是 .Net 4,这看起来非常适合 Parallel.ForEach()。您可以设置它的MaxDegreeOfParallelism,这意味着您可以保证一次不会再处理任何项目。

      Parallel.ForEach(items,
                       new ParallelOptions { MaxDegreeOfParallelism = 5 },
                       ProcessItem);
      

      这里,ProcessItem 是一种通过访问您的服务器并阻塞直到处理完成来处理一个项目的方法。如果需要,您可以使用 lambda。

      【讨论】:

        【解决方案3】:

        创建您自己的五个线程的线程池并不棘手 - 只需创建一个描述要发出的请求的对象并发队列,并让五个线程根据需要循环执行任务。添加一个 AutoResetEvent,您可以确保它们不会在没有需要处理的请求时疯狂旋转。

        将响应返回给正确的调用线程可能很棘手。如果您的其余代码的工作方式是这种情况,我会采用不同的方法并创建一个限制器,它的作用有点像监视器,但允许 5 个同时线程而不是仅一个:

        private static class RequestLimiter
        {
          private static AutoResetEvent _are = new AutoResetEvent(false);
          private static int _reqCnt = 0;
          public ResponseObject DoRequest(RequestObject req)
          {
            for(;;)
            {
              if(Interlocked.Increment(ref _reqCnt) <= 5)
              {
                //code to create response object "resp".
                Interlocked.Decrement(ref _reqCnt);
                _are.Set();
                return resp;
              }
              else
              {
                  if(Interlocked.Decrement(ref _reqCnt) >= 5)//test so we don't end up waiting due to race on decrementing from finished thread.
                   _are.WaitOne();
              }
            }
          }
        }
        

        【讨论】:

        • 吹毛求疵:我认为while(true)for(;;) 更能表达“无限”循环的意图。特别是对于新手来说,你的方式可能会令人困惑。
        • 另外,您可以使用框架中的Semaphore(或SemaphoreSlim),而不是自己实现。
        • @svick 好吧,我们中的很多人从 K&R 那里学到了 for(;;) 的成语,并将其读作“永远”。认为这是对 Dennis Ritchie 的致敬 :) 至于 SemaphoreSlim,好点 - 前段时间我不得不实现自己的而不是使用它是有原因的,这导致我忘记了它,但是是的,这是一个很好的解决方案.
        【解决方案4】:

        您可以编写一个小辅助方法,它会阻塞当前线程,直到所有线程都完成执行给定的操作委托。

        static void SpawnThreads(int count, Action action)
        {
            var countdown = new CountdownEvent(count);
        
            for (int i = 0; i < count; i++)
            {
                new Thread(() =>
                {
                    action();
                    countdown.Signal();
                }).Start();
            }
        
            countdown.Wait();
        }
        

        然后使用BlockingCollection&lt;string&gt;(线程安全集合)来跟踪您的 xml 文件。通过使用上面的辅助方法,您可以编写如下内容:

        static void Main(string[] args)
        {
            var xmlFiles = new BlockingCollection<string>();
        
            // Add some xml files....
        
            SpawnThreads(5, () =>
            {
                using (var web = new WebClient())
                {
                    web.UploadFile(xmlFiles.Take());
                }
            });
        
            Console.WriteLine("Done");
            Console.ReadKey();
        }
        

        更新

        更好的方法是异步上传文件,这样您就不会在使用线程执行 IO 任务时浪费资源。

        你可以再写一个辅助方法:

        static void SpawnAsyncs(int count, Action<CountdownEvent> action)
        {
            var countdown = new CountdownEvent(count);
        
            for (int i = 0; i < count; i++)
            {
                action(countdown);
            }
        
            countdown.Wait();
        }
        

        并像这样使用它:

        static void Main(string[] args)
        {
            var urlXML = new BlockingCollection<Tuple<string, string>>();
            urlXML.Add(Tuple.Create("http://someurl.com", "filename"));
        
            // Add some more to collection...
        
            SpawnAsyncs(5, c =>
            {
                using (var web = new WebClient())
                {
                    var current = urlXML.Take();
        
                    web.UploadFileCompleted += (s, e) =>
                    {
                        // some code to mess with e.Result (response)
                        c.Signal();
                    };
        
                    web.UploadFileAsyncAsync(new Uri(current.Item1), current.Item2);
                }
            });
        
            Console.WriteLine("Done");
            Console.ReadKey();
        }
        

        【讨论】:

          猜你喜欢
          • 2021-12-24
          • 2012-05-11
          • 2017-09-03
          • 2011-01-12
          • 2020-04-19
          • 2013-03-19
          • 1970-01-01
          相关资源
          最近更新 更多