【问题标题】:How to Perform Multiple "Pings" in Parallel using C#如何使用 C# 并行执行多个“Ping”
【发布时间】:2012-11-04 13:03:53
【问题描述】:

我正在尝试计算一组服务器的平均往返时间。为了加快速度,我想并行执行 ping。我编写了一个名为AverageRoundtripTime() 的函数,它似乎可以工作,但是,由于我对多线程不太了解,我想知道我所做的是否还可以。请看一下我的代码,让我知道它是否可以,或者是否有更好的方法来实现我想要的:

public void Main()
{
    // Collection of hosts.
    List<String> hosts = new List<String>();

    // Add 100 hosts to the collection.
    for (Int32 i = 0; i < 100; ++i) hosts.Add("www.google.com");

    // Display the average round-trip time for 100 hosts.
    Console.WriteLine(AverageRoundtripTime(hosts));

}

public Double AverageRoundtripTime(IEnumerable<String> hosts)
{
    // Collection of threads.
    List<Thread> threads = new List<Thread>();

    // Collection of ping replies.
    List<PingReply> pingReplies = new List<PingReply>();

    // Loop through all host names.
    foreach (var host in hosts)
    {
        // Create a new thread.
        Thread thread = new Thread(() =>
        {
            // Variable to hold the ping reply.
            PingReply reply = null;

            // Create a new Ping object and make sure that it's 
            // disposed after we're finished with it.
            using (Ping ping = new Ping())
            {
                    reply = ping.Send(host);

            }

            // Get exclusive lock on the pingReplies collection.
            lock (pingReplies)
            {
                // Add the ping reply to the collection.
                pingReplies.Add(reply);

            }

        });

        // Add the newly created thread to the theads collection.
        threads.Add(thread);

        // Start the thread.
        thread.Start();

    }

    // Wait for all threads to complete
    foreach (Thread thread in threads)
    {
        thread.Join();

    }

    // Calculate and return the average round-trip time.
    return pingReplies.Average(x => x.RoundtripTime);

}

更新:

查看我提出的一个相关问题:

Task Parallel Library Code Freezes in a Windows Forms Application - Works fine as a Windows Console Application

【问题讨论】:

  • 使用任务并行库。
  • 我同意你的评论。唯一的问题是他使用的 .NET 版本。
  • 然后任务并行库是要走的路
  • @SLaks 你能举个例子吗?

标签: c# multithreading ping


【解决方案1】:

ping 类有一个方法SendAsync。这遵循基于事件的异步编程 (EAP) 模式。查看这篇文章:http://msdn.microsoft.com/en-us/library/ee622454.aspx

这里有个简单的例子,我有一个方法,它以非常基本的方式实现了那篇文章。您基本上可以根据需要多次调用它,并且所有 ping 都将异步完成。

    class Program
    {
    public static string[] addresses = {"microsoft.com", "yahoo.com", "google.com"};
    static void Main(string[] args)
    {
        List<Task<PingReply>> pingTasks = new List<Task<PingReply>>();
        foreach (var address in addresses)
        {
            pingTasks.Add(PingAsync(address));
        }

        //Wait for all the tasks to complete
        Task.WaitAll(pingTasks.ToArray());

        //Now you can iterate over your list of pingTasks
        foreach (var pingTask in pingTasks)
        {
            //pingTask.Result is whatever type T was declared in PingAsync
            Console.WriteLine(pingTask.Result.RoundtripTime);
        }
        Console.ReadLine();
    }

    static Task<PingReply> PingAsync(string address)
    {
        var tcs = new TaskCompletionSource<PingReply>();
        Ping ping = new Ping();
        ping.PingCompleted += (obj, sender) =>
            {
                tcs.SetResult(sender.Reply);
            };
        ping.SendAsync(address, new object());
        return tcs.Task;
    }
}

【讨论】:

  • 感谢您的示例!我如何知道所有“ping”何时已完成,以及如何收集每个 ping 的响应以计算平均值?
  • 啊,这是个好问题。我会在所有任务的调用函数中保留一个列表。然后你可以执行Task.WaitAll(listOfTasks.ToArray()) 之类的操作,它会一直阻塞,直到所有调用都完成。
  • 更新了一个(希望)更有用的例子。
  • 谢谢,Pete...当我运行您的代码时,我的程序在Task.WaitAll(pingTasks.ToArray()); 冻结。我过去曾尝试使用Task.WaitAll(),但它总是冻结...有什么想法吗?
  • 单步执行代码。它将阻塞(“冻结”),直到所有任务完成。如果没有更多代码,我可以告诉你的最好的就是你的任务由于某种原因仍在执行。
【解决方案2】:

使用 Parallel.For 和 ConcurrentBag

    static void Main(string[] args)
    {
        Console.WriteLine(AverageRoundTripTime("www.google.com", 100));
        Console.WriteLine(AverageRoundTripTime("www.stackoverflow.com", 100));
        Console.ReadKey();
    }

    static double AverageRoundTripTime(string host, int sampleSize)
    {
        ConcurrentBag<double> values = new ConcurrentBag<double>();
        Parallel.For(1, sampleSize, (x, y) => values.Add(Ping(host)));
        return values.Sum(x => x) / sampleSize;
    }
    static double Ping(string host)
    {
        var reply = new Ping().Send(host);
        if (reply != null)
            return reply.RoundtripTime;
        throw new Exception("denied");
    }

【讨论】:

  • 谢谢,shiznit...您的代码运行良好。一个问题,似乎我的原始代码至少快了 2 倍……这有什么原因吗?使用任务并行库是否有额外开销?
  • 从资源监视器和 Parallel.For 的文档来看,它似乎试图限制它将使用的线程数。在这个例子中,我的代码使用了大约 20 个,而您的实现和 Pete 创建了大约 40 个
  • 感谢您的解释 shiznit...我想知道他们为什么将 Parallel.For 限制为 20 个线程?我想我必须阅读文档。再次感谢!
【解决方案3】:

// 使用 LINQ 解决方案变得更简单

List<String> hosts = new List<String>();
for (Int32 i = 0; i < 100; ++i) hosts.Add("www.google.com");

var average = hosts.AsParallel().WithDegreeOfParallelism(64).
              Select(h => new Ping().Send(h).RoundtripTime).Average();


Console.WriteLine(average)

【讨论】:

    【解决方案4】:

    也许像这样使用SendPingAsync

    using (var ping = new Ping())
    {
        var replies = await Task.WhenAll(hosts.Select(x => ping.SendPingAsync(x)))
                                .ConfigureAwait(false);
                                // false here   ^ unless you want to schedule back to sync context
        ... process replies.
    }
    

    【讨论】:

      【解决方案5】:

      解决方案:

      internal class Utils
      {
          internal static PingReply Ping (IPAddress address, int timeout = 1000, int ttl = 64)
          {
                  PingReply tpr = null;
                  var p = new Ping ();
                  try {
      
                      tpr = p.Send (address,
                          timeout,
                          Encoding.ASCII.GetBytes ("oooooooooooooooooooooooooooooooo"),
                          new PingOptions (ttl, true));
      
                  } catch (Exception ex) {
      
                      tpr = null;
      
                  } finally {
                      if (p != null)
                          p.Dispose ();
      
                      p = null;
                  }
      
                  return tpr;
              }
      
              internal static List<PingReply> PingAddresses (List<IPAddress> addresses, int timeout = 1000, int ttl = 64)
              {
                  var ret = addresses
                      .Select (p => Ping (p, timeout, ttl))
                      .Where (p => p != null)
                      .Where (p => p.Status == IPStatus.Success)
                      .Select (p => p).ToList ();
      
                  return ret;
              }
      
              internal static Task PingAddressesAsync (List<IPAddress> addresses, Action<Task<List<PingReply>>> endOfPing, int timeout = 1000, int ttl = 64)
              {
      
                  return Task.Factory.StartNew<List<PingReply>> (() => Utils.PingAddresses (
                      addresses, timeout, ttl)).ContinueWith (t => endOfPing (t));
      
              }   
      }
      

      并使用:

      Console.WriteLine ("start");
      
      Utils.PingAddressesAsync (new List<IPAddress> () { 
                          IPAddress.Parse ("192.168.1.1"), 
                          IPAddress.Parse ("192.168.1.13"), 
                          IPAddress.Parse ("192.168.1.49"),
                          IPAddress.Parse ("192.168.1.200")
                      }, delegate(Task<List<PingReply>> tpr) {
      
                          var lr = tpr.Result;
                          Console.WriteLine ("finish with " + lr.Count.ToString () + " machine found");
      
                          foreach (var pr in lr) {
                              Console.WriteLine (pr.Address.ToString ());
              }
      
      });
      
      Console.WriteLine ("execute");
      Console.ReadLine ();
      

      【讨论】:

        【解决方案6】:

        这是一个可以 ping 多个端点的异步工作者。您可以 Start() 或 Stop() 心跳工作者并订阅以下事件:

        • PingUp(当端点关闭时边缘触发)
        • PingDown(当端点启动时边缘触发)
        • PulseStarted
        • 脉冲结束
        • Ping 错误

        -

        public class NetworkHeartbeat
        {
            private static object lockObj = new object();
        
            public bool Running { get; private set; }
            public int PingTimeout { get; private set; }
            public int HeartbeatDelay { get; private set; }
            public IPAddress[] EndPoints { get; private set; }
            public int Count => EndPoints.Length;
            public PingReply[] PingResults { get; private set; }
            private Ping[] Pings { get; set; }
        
            public NetworkHeartbeat(IEnumerable<IPAddress> hosts, int pingTimeout, int heartbeatDelay)
            {
                PingTimeout = pingTimeout;
                HeartbeatDelay = heartbeatDelay;
        
                EndPoints = hosts.ToArray();
                PingResults = new PingReply[EndPoints.Length];
                Pings = EndPoints.Select(h => new Ping()).ToArray();
            }
        
            public async void Start()
            {
                if (!Running)
                {
                    try
                    {
                        Debug.WriteLine("Heartbeat : starting ...");
        
                        // set up the tasks
                        var chrono = new Stopwatch();
                        var tasks = new Task<PingReply>[Count];
        
                        Running = true;
        
                        while (Running)
                        {
                            // set up and run async ping tasks                 
                            OnPulseStarted(DateTime.Now, chrono.Elapsed);
                            chrono.Restart();
                            for (int i = 0; i < Count; i++)
                            {
                                tasks[i] = PingAndUpdateAsync(Pings[i], EndPoints[i], i);
                            }
                            await Task.WhenAll(tasks);
        
                            for (int i = 0; i < tasks.Length; i++)
                            {
                                var pingResult = tasks[i].Result;
        
                                if (pingResult != null)
                                {
                                    if (PingResults[i] == null)
                                    {
                                        if (pingResult.Status == IPStatus.Success)
                                            OnPingUp(i);
                                    }
                                    else if (pingResult.Status != PingResults[i].Status)
                                    {
                                        if (pingResult.Status == IPStatus.Success)
                                            OnPingUp(i);
                                        else if (PingResults[i].Status == IPStatus.Success)
                                            OnPingDown(i);
                                    }
                                }
                                else
                                {
                                    if (PingResults[i] != null && PingResults[i].Status == IPStatus.Success)
                                        OnPingUp(i);
                                }
        
                                PingResults[i] = tasks[i].Result;
                                Debug.WriteLine("> Ping [" + PingResults[i].Status.ToString().ToUpper() + "] at " + EndPoints[i] + " in " + PingResults[i].RoundtripTime + " ms");
                            }
        
                            OnPulseEnded(DateTime.Now, chrono.Elapsed);
        
                            // heartbeat delay
                            var delay = Math.Max(0, HeartbeatDelay - (int)chrono.ElapsedMilliseconds);
                            await Task.Delay(delay);
                        }
                        Debug.Write("Heartbeat : stopped");
                    }
                    catch (Exception)
                    {
                        Debug.Write("Heartbeat : stopped after error");
                        Running = false;
                        throw;
                    }
                }
                else
                {
                    Debug.WriteLine("Heartbeat : already started ...");
                }
            }
        
            public void Stop()
            {
                Debug.WriteLine("Heartbeat : stopping ...");
                Running = false;
            }
        
            private async Task<PingReply> PingAndUpdateAsync(Ping ping, IPAddress epIP, int epIndex)
            {
                try
                {
                    return await ping.SendPingAsync(epIP, PingTimeout);
                }
                catch (Exception ex)
                {
                    Debug.Write("-[" + epIP + "] : error in SendPing()");
                    OnPingError(epIndex, ex);
                    return null;
                }
            }
        
            // Event on ping errors
            public event EventHandler<PingErrorEventArgs> PingError;
            public class PingErrorEventArgs : EventArgs
            {
                public int EndPointIndex { get; private set; }
                public Exception InnerException { get; private set; }
        
                public PingErrorEventArgs(int epIndex, Exception ex)
                {
                    EndPointIndex = epIndex;
                    InnerException = ex;
                }
            }
            private void OnPingError(int epIndex, Exception ex) => PingError?.Invoke(this, new PingErrorEventArgs(epIndex, ex));
        
            // Event on ping Down
            public event EventHandler<int> PingDown;
            private void OnPingDown(int epIndex)
            {
                Debug.WriteLine("# Ping [DOWN] at " + EndPoints[epIndex]);
                PingDown?.Invoke(this, epIndex);
            }
        
            // Event on ping Up
            public event EventHandler<int> PingUp;
            private void OnPingUp(int epIndex)
            {
                Debug.WriteLine("# Ping [UP] at " + EndPoints[epIndex] );
                PingUp?.Invoke(this, epIndex);
            }
        
            // Event on pulse started
            public event EventHandler<PulseEventArgs> PulseStarted;
            public class PulseEventArgs : EventArgs
            {
                public DateTime TimeStamp { get; private set; }
                public TimeSpan Delay { get; private set; }
        
                public PulseEventArgs(DateTime date, TimeSpan delay)
                {
                    TimeStamp = date;
                    Delay = delay;
                }
            }
            private void OnPulseStarted(DateTime date, TimeSpan delay)
            {
                Debug.WriteLine("# Heartbeat [PULSE START] after " + (int)delay.TotalMilliseconds + " ms");
                PulseStarted?.Invoke(this, new PulseEventArgs(date, delay));
            }
        
            // Event on pulse ended
            public event EventHandler<PulseEventArgs> PulseEnded;
            private void OnPulseEnded(DateTime date, TimeSpan delay)
            {
                PulseEnded?.Invoke(this, new PulseEventArgs(date, delay));
                Debug.WriteLine("# Heartbeat [PULSE END] after " + (int)delay.TotalMilliseconds + " ms");
            }
        } 
        

        【讨论】:

          猜你喜欢
          • 2020-12-07
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          相关资源
          最近更新 更多