【问题标题】:Parallel async operation, how to "kill" one after a timeout?并行异步操作,超时后如何“杀死”一个?
【发布时间】:2017-01-20 01:24:07
【问题描述】:

我有一个程序用于通过 SNMP 从我的网络获取一些数据。 我使用最后一个 .NET 和 SharpSnmp library。 我实现了this post 中公开的 ForEachAsync 方法。 所以我可以并行执行 snmp 请求(并详细说明响应),为列表中的每个设备创建一个任务。它可以工作,但如果设备由于某种原因没有回复,我的程序就会卡住。 所以我需要管理某种超时来“杀死”库公开的异步函数。 这就是我在 foreachAsync 中调用的函数:

 public static async Task<Tuple<string, List<Variable>, Exception>>
            GetAsync(string ip, IEnumerable<string> vars, int timeout = 5000)
        {
            try
            {
                IPAddress agentIp;
                bool parsed = IPAddress.TryParse(ip, out agentIp);
                if (!parsed)
                {
                    foreach (IPAddress address in
                        Dns.GetHostAddresses(ip).Where(address => address.AddressFamily == AddressFamily.InterNetwork))
                    {
                        agentIp = address;
                        break;
                    }

                    if (agentIp == null)
                        throw new Exception("Impossibile inizializzare la classe CGesSnmp senza un indirizzo IP valido");
                }

                IPEndPoint receiver = new IPEndPoint(agentIp, LOCAL_PORT);
                VersionCode version = VersionCode.V2;
                string community = "public";
                List<Variable> vList = new List<Variable>();
                foreach (string s in vars)
                    vList.Add(new Variable(new ObjectIdentifier(s)));
                 // This is the function I want to "stop" or "kill" in some way
                List<Variable> result = (List<Variable>)await Messenger.GetAsync(version, receiver, new OctetString(community), vList);

                return new Tuple<string, List<Variable>, Exception>(ip, result, null);
            }
            catch (Exception ex)
            {
                return new Tuple<string, List<Variable>, Exception>(ip, null, ex);
            }
        }  

【问题讨论】:

  • 查看是否存在接受 TaskCancellationToken 的方法的变体。
  • 我忘了指定。没有!
  • 你可以使用this technique包装它。

标签: c# .net async-await


【解决方案1】:

如果方法支持TaskCancellationToken

public static async Task<Tuple<string, List<Variable>, Exception>>
            GetAsync(string ip, IEnumerable<string> vars, int timeout = 5000)
        {
            try
            {
                IPAddress agentIp;
                bool parsed = IPAddress.TryParse(ip, out agentIp);
                if (!parsed)
                {
                    foreach (IPAddress address in
                        Dns.GetHostAddresses(ip).Where(address => address.AddressFamily == AddressFamily.InterNetwork))
                    {
                        agentIp = address;
                        break;
                    }

                    if (agentIp == null)
                        throw new Exception("Impossibile inizializzare la classe CGesSnmp senza un indirizzo IP valido");
                }

                IPEndPoint receiver = new IPEndPoint(agentIp, LOCAL_PORT);
                VersionCode version = VersionCode.V2;
                string community = "public";
                List<Variable> vList = new List<Variable>();
                foreach (string s in vars)
                    vList.Add(new Variable(new ObjectIdentifier(s)));
                 // This is the function I want to "stop" or "kill" in some way

                CancellationTokenSource cancel = new CancellationTokenSource(TimeSpan.FromSeconds(timeout));
                List<Variable> result = (List<Variable>)await Messenger.GetAsync(version, receiver, new OctetString(community), vList, cancel.Token);

                return new Tuple<string, List<Variable>, Exception>(ip, result, null);
            }
            catch (Exception ex)
            {
                return new Tuple<string, List<Variable>, Exception>(ip, null, ex);
            }
        }  

否则

public static Task<Tuple<string, List<Variable>, Exception>>
            GetAsync(string ip, IEnumerable<string> vars, int timeout = 5000){
 TaskCompletionSource<Tuple<string,List<Variable>>> taskSource = 
       new TaskCompletionSource<Tuple<string,List<Variable>>>();

 Task.Run(async ()=> {
     var result = await GetAsync(ip,vars);
     taskSource.TrySetResult(result);
 });

 Task.Run(async ()=>{
     await Task.Delay(TimeSpan.FromSeconds(timeout));
     taskSource.TrySetCancelled();
 });

 return taskSource.Task;
}



private static async Task<Tuple<string, List<Variable>, Exception>>
            _GetAsync(string ip, IEnumerable<string> vars)
        {
            try
            {
                IPAddress agentIp;
                bool parsed = IPAddress.TryParse(ip, out agentIp);
                if (!parsed)
                {
                    foreach (IPAddress address in
                        Dns.GetHostAddresses(ip).Where(address => address.AddressFamily == AddressFamily.InterNetwork))
                    {
                        agentIp = address;
                        break;
                    }

                    if (agentIp == null)
                        throw new Exception("Impossibile inizializzare la classe CGesSnmp senza un indirizzo IP valido");
                }

                IPEndPoint receiver = new IPEndPoint(agentIp, LOCAL_PORT);
                VersionCode version = VersionCode.V2;
                string community = "public";
                List<Variable> vList = new List<Variable>();
                foreach (string s in vars)
                    vList.Add(new Variable(new ObjectIdentifier(s)));
                 // This is the function I want to "stop" or "kill" in some way
                List<Variable> result = (List<Variable>)await Messenger.GetAsync(version, receiver, new OctetString(community), vList);

                return new Tuple<string, List<Variable>, Exception>(ip, result, null);
            }
            catch (Exception ex)
            {
                return new Tuple<string, List<Variable>, Exception>(ip, null, ex);
            }
        }  

【讨论】:

  • 好的。这很有用。但它不起作用,至少在 forEachAsync 扩展中是这样。你看到我链接的文章了吗?如果我所有的设备都在线,一切正常(就像以前一样),如果一个或多个设备离线,我就会卡住(所以显然是一样的)。
【解决方案2】:
Task<[whatever type GetAsync returns]> messengerTask = Messenger.GetAsync(version, receiver, new OctetString(community), vList);


if (await Task.WhenAny(messengerTask, Task.Delay(timeout)) == messengerTask)
{
//GetAsync completed within timeout. Access task's result using
// messengerTask.Result. You can also check if the task RanToCompletion.
} 
else
{ 
//timeout    
}

【讨论】:

  • 好的。但是在这种情况下,getAsync“任务”会发生什么?
  • 无论如何它不会编译
  • :) 它无法编译,因为您可能没有使用从 GetAsync 任务返回的类型更改 [GetAsync 返回的任何类型]。
  • 哦不,我改变了,但编译器不接受它。它假装一种 List,而不是 Task>
  • 我修改了你的代码,它似乎可以工作。无论如何,我必须检查一些长时间的性能。您必须使用 Task.Run 创建一个任务,包装异步调用。
猜你喜欢
  • 1970-01-01
  • 2012-08-16
  • 1970-01-01
  • 1970-01-01
  • 2015-09-03
  • 2020-02-13
  • 2016-04-22
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多