【问题标题】:Invoking operations asynchronously with expiration使用过期异步调用操作
【发布时间】:2010-12-03 21:10:04
【问题描述】:

我有一个经典的异步消息调度问题。本质上,我需要异步调度消息,然后在调度完成时捕获消息响应。问题是,我似乎无法弄清楚如何让任何一个请求周期自到期和短路。

这是我目前使用的模式示例:

为调用定义的委托

private delegate IResponse MessageDispatchDelegate(IRequest request);

使用回调发送消息

var dispatcher = new MessageDispatchDelegate(DispatchMessage);
dispatcher.BeginInvoke(requestMessage, DispatchMessageCallback, null);

发送消息

private IResponse DispatchMessage(IRequest request)
{
  //Dispatch the message and throw exception if it times out
}

以响应或异常的形式获取调度结果

private void DispatchMessageCallback(IAsyncResult ar)
{
  //Get result from EndInvoke(r) which could be IResponse or a Timeout Exception
}

我想不通的是如何在 DispatchMessage 方法中干净地实现超时/短路过程。任何想法将不胜感激

【问题讨论】:

  • 别去那里,那里有疯狂、绝望和活生生吃掉你的巨龙。最后一个是好的结果。

标签: c# .net multithreading asynchronous delegates


【解决方案1】:
        var dispatcher = new MessageDispatchDelegate(DispatchMessage);

        var asyncResult = dispatcher.BeginInvoke(requestMessage, DispatchMessageCallback, null);
        if (!asyncResult.AsyncWaitHandle.WaitOne(1000, false))
        {
             /*Timeout action*/
        }
        else
        {
            response = dispatcher.EndInvoke(asyncResult);
        }

【讨论】:

  • 不会阻止调用,否则会开始下一次并行调用重叠调度并使整个过程同步?目标是允许 n 次调用并行发送消息。话虽如此,值得一试。
  • 我做了一个测试台,并排测试了这两种模式。一个有你的建议,一个没有(例如没有超时支持)。我的直觉是正确的。这成为一个同步过程;如您所料;需要更多时间。事实证明,还有许多其他副作用需要处理。为了提高性能,WaitHandle 必须显式关闭,并且在某些情况下 EndInvoke 被调用两次,这会导致代价高昂的异常(例如,在目标回调方法和“else”块中调用)。我能够在我的测试台上看到这两个问题。
  • @JoeGeeky:您可以使用WaitHandle.WaitAny 并行运行多个。但是,正如您的回答那样,使用新的 appdomain 是希望中止花费太长时间的操作的唯一方法(即使使用 appdomain,如果操作正在运行非托管代码,则可能无法中止)。
  • 谢谢,我没有想到非托管影响,虽然我不打算在我的情况下使用它。
【解决方案2】:

经过一番摸索后,我终于找到了解决我最初问题的方法。首先,让我说我收到了很多很棒的回复,并且我测试了所有回复(用结果评论每一个)。主要问题是所有提议的解决方案要么导致死锁(导致 100% 超时场景),要么导致异步进程同步。我不喜欢回答我自己的问题(有史以来第一次),但在这种情况下,我接受了 StackOverflow 常见问题解答的建议,因为我已经真正吸取了教训,并希望与社区。​​p>

最后,我将提议的解决方案与调用委托组合到备用 AppDomain 中。它的代码多一点,成本也高一点,但这避免了死锁并允许完全异步调用,这正是我所需要的。这是位...

首先我需要一些东西来调用另一个 AppDomain 中的委托

/// <summary>
/// Invokes actions in alternate AppDomains
/// </summary>
public static class DomainInvoker
{
    /// <summary>
    /// Invokes the supplied delegate in a new AppDomain and then unloads when it is complete
    /// </summary>
    public static T ExecuteInNewDomain<T>(Delegate delegateToInvoke, params object[] args)
    {
        AppDomain invocationDomain = AppDomain.CreateDomain("DomainInvoker_" + delegateToInvoke.GetHashCode());

        T returnValue = default(T);
        try
        {
            var context = new InvocationContext(delegateToInvoke, args);
            invocationDomain.DoCallBack(new CrossAppDomainDelegate(context.Invoke));

            returnValue = (T)invocationDomain.GetData("InvocationResult_" + invocationDomain.FriendlyName);
        }
        finally
        {
            AppDomain.Unload(invocationDomain);
        }
        return returnValue;
    }

    [Serializable]
    internal sealed class InvocationContext
    {
        private Delegate _delegateToInvoke;
        private object[] _arguments;

        public InvocationContext(Delegate delegateToInvoke, object[] args)
        {
            _delegateToInvoke = delegateToInvoke;
            _arguments = args;
        }

        public void Invoke()
        {
            if (_delegateToInvoke != null)
                AppDomain.CurrentDomain.SetData("InvocationResult_" + AppDomain.CurrentDomain.FriendlyName,
                    _delegateToInvoke.DynamicInvoke(_arguments));
        }
    }
}

第二我需要一些东西来协调所需参数的收集并收集/解决结果。这还将定义将在备用 AppDomain 中异步调用的超时和工作进程

注意:在我的测试中,我扩展了 dispatch worker 方法以随机花费时间来观察在超时和非超时情况下一切都按预期工作

public delegate IResponse DispatchMessageWithTimeoutDelegate(IRequest request, int timeout = MessageDispatcher.DefaultTimeoutMs);

[Serializable]
public sealed class MessageDispatcher
{
    public const int DefaultTimeoutMs = 500;

    /// <summary>
    /// Public method called on one more many threads to send a request with a timeout
    /// </summary>
    public IResponse SendRequest(IRequest request, int timeout)
    {
        var dispatcher = new DispatchMessageWithTimeoutDelegate(SendRequestWithTimeout);
        return DomainInvoker.ExecuteInNewDomain<Response>(dispatcher, request, timeout);
    }

    /// <summary>
    /// Worker method invoked by the <see cref="DomainInvoker.ExecuteInNewDomain<>"/> process 
    /// </summary>
    private IResponse SendRequestWithTimeout(IRequest request, int timeout)
    {
        IResponse response = null;

        var dispatcher = new DispatchMessageDelegate(DispatchMessage);

        //Request Dispatch
        var asyncResult = dispatcher.BeginInvoke(request, null, null);

        //Wait for dispatch to complete or short-circuit if it takes too long
        if (!asyncResult.AsyncWaitHandle.WaitOne(timeout, false))
        {
            /* Timeout action */
            response = null;
        }
        else
        {
            /* Invoked call ended within the timeout period */
            response = dispatcher.EndInvoke(asyncResult);
        }

        return response;
    }

    /// <summary>
    /// Worker method to do the actual dispatch work while being monitored for timeout
    /// </summary>
    private IResponse DispatchMessage(IRequest request)
    {
        /* Do real dispatch work here */
        return new Response();
    }
}

第三我需要一些东西来代替异步触发调度的实际事物

注意:这只是为了演示我需要的异步行为。实际上,上面的 FirstSecond 项展示了备用线程上超时行为的隔离。这只是演示了如何使用上述资源

public delegate IResponse DispatchMessageDelegate(IRequest request);

class Program
{
    static int _responsesReceived;

    static void Main()
    {
        const int max = 500;

        for (int i = 0; i < max; i++)
        {
            SendRequest(new Request());
        }

        while (_responsesReceived < max)
        {
            Thread.Sleep(5);
        }
    }

    static void SendRequest(IRequest request, int timeout = MessageDispatcher.DefaultTimeoutMs)
    {
        var dispatcher = new DispatchMessageWithTimeoutDelegate(SendRequestWithTimeout);
        dispatcher.BeginInvoke(request, timeout, SendMessageCallback, request);
    }

    static IResponse SendRequestWithTimeout(IRequest request, int timeout = MessageDispatcher.DefaultTimeoutMs)
    {
        var dispatcher = new MessageDispatcher();
        return dispatcher.SendRequest(request, timeout);
    }

    static void SendMessageCallback(IAsyncResult ar)
    {
        var result = (AsyncResult)ar;
        var caller = (DispatchMessageWithTimeoutDelegate)result.AsyncDelegate;

        Response response;

        try
        {
            response = (Response)caller.EndInvoke(ar);
        }
        catch (Exception)
        {
            response = null;
        }

        Interlocked.Increment(ref _responsesReceived);
    }
}

回想起来,这种方法会产生一些意想不到的后果。由于工作方法出现在备用 AppDomain 中,这为异常添加了额外的保护(尽管它也可以隐藏它们),允许您加载和卸载其他托管程序集(如果需要),并允许您定义高度受限或专门的安全上下文。这需要更多的产品化,但提供了回答我最初问题的框架。希望这对某人有所帮助。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2017-04-21
    • 1970-01-01
    • 2017-12-16
    • 2016-06-17
    • 2017-09-09
    • 2016-10-15
    • 1970-01-01
    相关资源
    最近更新 更多