【问题标题】:How to execute multiple async calls in parallel?如何并行执行多个异步调用?
【发布时间】:2011-10-26 20:11:21
【问题描述】:

我有许多命令调用一个肥皂网络服务(Betfair API)。都是经典的异步编程模型类型...

public void DoXXX( <input parameters ...> )
{
    XXXRequest Request = new XXXRequest();
    // populate Request from input parameters ...
    BetfairService.BeginXXX( Request, XXXCallback, State );
}

private void XXXCallback(IAsyncResult Result)
{
    XXXResponse Response = BetfairService.EndXXX(Result);
    if (Response.ErrorCode == XXXErrorCode.OK)
        // store data from Response
    else
        // deal with error
}

我想执行一组指定的命令,然后在所有命令完成后使用组合返回的数据值进行一些计算。

我可以按顺序执行此操作,方法是创建一个命令队列,并让每个回调方法在完成后触发队列中的下一个命令,并将计算作为队列中的最后一项。但是,这相对较慢。

我理想的解决方案是让所有这些命令并行运行,然后在所有命令完成后触发计算。我试过查看 Task.Factory.FromAsync(),但我能找到的所有示例都只包括对 BeginXXX / EndXXX 的直接调用,而不是对响应做任何事情。

有没有人对这个问题的合适解决方案有任何指示?

【问题讨论】:

    标签: c# asynchronous


    【解决方案1】:

    要使用FromAsync,需要指定返回类型:

    var task = Task<XXXResponse>.Factory.FromAsync( ...
    

    然后您有一个具有Result 类型的XXXResponse 属性的任务。

    然后您可以使用Parallel.Invoke 并行运行初始命令。这将阻塞,直到所有这些任务完成。然后你可以做你的“额外处理”。

    或者您可以将初始任务存储在一个数组中并使用Task.Factory.ContinueWhenAll 创建一个延续。

    尼克

    【讨论】:

    • 如果我使用的是 Task.Factory.ContinueWhenAll,有没有办法强制它等待所有回调实际完成处理而不是仅仅被调用?
    • 我不完全确定你在问什么。在EndXXX 方法完成之前,从Task.Factory.FromAsync 返回的任务不应完成。 ContinueWhenAll 还返回一个Task 对象,您可以在该对象上执行所有正常的任务操作,例如Wait()
    • 其实那是我的错。我正在为 WPF 开发,因此可能更新绑定属性的所有内容都包含在 CurrentDispatcher.BeginInvoke() 中 - 这就是导致我的排序问题的原因。这不是任务所必需的,一切正常。感谢您的提示。
    【解决方案2】:

    我建议您查看 Microsoft 的 Reactive Extensions (Rx) 来做您想做的事。它允许您将异步操作(除其他外)转换为可观察的 LINQ 查询。

    假设我有这三个函数,每个函数都需要大量时间来计算:

    Func<int> fa = () =>
    {
        Thread.Sleep(2000);
        return 42;
    };
    
    Func<int, string, string> fb = (n, t) =>
    {
        Thread.Sleep(n * 1000);
        return t + n.ToString();
    };
    
    Func<DateTimeOffset> fc = () =>
    {
        Thread.Sleep(1000);
        return DateTimeOffset.UtcNow;
    };
    

    然后我可以使用FromAsyncPattern 方法将这些 lambda 函数转换为可观察函数:

    Func<IObservable<int>> ofa =
        Observable
            .FromAsyncPattern<int>(
                fa.BeginInvoke,
                fa.EndInvoke);
    
    Func<int, string, IObservable<string>> ofb =
        Observable
            .FromAsyncPattern<int, string, string>(
                fb.BeginInvoke,
                fb.EndInvoke);
    
    Func<IObservable<DateTimeOffset>> ofc =
        Observable
            .FromAsyncPattern<DateTimeOffset>(
                fc.BeginInvoke,
                fc.EndInvoke);
    

    现在我只需执行以下操作即可开始所有呼叫:

    IObservable<int> oa = ofa();
    IObservable<string> ob = ofb(1, "foo");
    IObservable<DateTimeOffset> oc = ofc();
    

    这有效地启动了三个并行计算。现在我们只需要将结果汇总在一起即可。

    这就是 LINQ 的用武之地:

    var query =
        from a in oa
        from b in ob
        from c in oc
        select new { a, b, c };
    

    然后我订阅此查询以获取结果:

    query.Subscribe(p =>
    {
        Console.WriteLine(p.a);
        Console.WriteLine(p.b);
        Console.WriteLine(p.c);
    });
    

    在我的测试中,我在这段代码周围放置了计时器来计算实际执行时间。尽管如果串联运行总时间应该是 4 秒,但此代码在 2 秒内完成 - 三者中的任何一个的最长时间。

    现在这个例子只是 Rx 可以做的一小部分,但它是一个很好的起点。

    如果我可以进一步解释,请大声喊叫。

    这里是 Rx 的链接:

    【讨论】:

      【解决方案3】:

      您应该有一个已执行服务调用的计数器。在每个回调方法中,您应该检查此计数器 - 如果它等于服务调用的最大数量,您应该进行额外处理,否则 - 您只需增加计数器。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2020-04-14
        • 1970-01-01
        • 2015-12-26
        • 2020-11-14
        • 2015-11-03
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多