【问题标题】:Limiting concurrent requests using Rx and SelectMany使用 Rx 和 SelectMany 限制并发请求
【发布时间】:2016-05-20 11:27:53
【问题描述】:

我有一个我想使用HttpClient 同时下载的页面的 URL 列表。 URL 列表可能很大(100 个或更多!)

我目前有这个代码:

var urls = new List<string>
            {
                @"http:\\www.amazon.com",
                @"http:\\www.bing.com",
                @"http:\\www.facebook.com",
                @"http:\\www.twitter.com",
                @"http:\\www.google.com"
            };

var client = new HttpClient();

var contents = urls
    .ToObservable()
    .SelectMany(uri => client.GetStringAsync(new Uri(uri, UriKind.Absolute)));

contents.Subscribe(Console.WriteLine);

问题:由于SelectMany的使用,几乎同时创建了一大堆Task。似乎如果 URL 列表足够大,很多任务都会超时(我收到 “A Task was cancelled” 异常)。

所以,我认为应该有一种方法,也许使用某种调度程序来限制并发任务的数量,在给定的时间不允许超过 5 或 6 个。

这样我就可以获得并发下载,而不会像现在那样启动太多可能会停止的任务。

如何做到这一点,这样我就不会因为大量超时任务而饱和?

【问题讨论】:

  • 您可能需要考虑使用DataFlow API。
  • 你能用我的代码集成它吗?我忽略了如何使用 DataFlow 来做到这一点。 TBH,我从未使用过它,但看看一些样本会有很大帮助。

标签: c# .net concurrency system.reactive reactive-programming


【解决方案1】:

记住SelectMany() 实际上是Select().Merge()。虽然SelectMany 没有maxConcurrent 参数,但Merge() 有。所以你可以使用它。

从您的示例中,您可以这样做:

var urls = new List<string>
    {
        @"http:\\www.amazon.com",
        @"http:\\www.bing.com",
        @"http:\\www.facebook.com",
        @"http:\\www.twitter.com",
        @"http:\\www.google.com"
    };

var client = new HttpClient();

var contents = urls
    .ToObservable()
    .Select(uri => Observable.FromAsync(() => client.GetStringAsync(uri)))
    .Merge(2); // 2 maximum concurrent requests!

contents.Subscribe(Console.WriteLine);

【讨论】:

    【解决方案2】:

    这是一个使用DataFlow API 的示例:

    private static Task DoIt()
    {
        var urls = new List<string>
        {
            @"http:\\www.amazon.com",
            @"http:\\www.bing.com",
            @"http:\\www.facebook.com",
            @"http:\\www.twitter.com",
            @"http:\\www.google.com"
        };
    
        var client = new HttpClient();
    
        //Create a block that takes a URL as input
        //and produces the download result as output
        TransformBlock<string,string> downloadBlock =
            new TransformBlock<string, string>(
                uri => client.GetStringAsync(new Uri(uri, UriKind.Absolute)),
                new ExecutionDataflowBlockOptions
                {
                    //At most 2 download operation execute at the same time
                    MaxDegreeOfParallelism = 2
                }); 
    
        //Create a block that prints out the result
        ActionBlock<string> doneBlock =
            new ActionBlock<string>(x => Console.WriteLine(x));
    
        //Link the output of the first block to the input of the second one
        downloadBlock.LinkTo(
            doneBlock,
            new DataflowLinkOptions { PropagateCompletion = true});
    
        //input the urls into the first block
        foreach (var url in urls)
        {
            downloadBlock.Post(url);
        }
    
        downloadBlock.Complete(); //Mark completion of input
    
        //Allows consumer to wait for the whole operation to complete
        return doneBlock.Completion;
    }
    
    static void Main(string[] args)
    {
        DoIt().Wait();
        Console.WriteLine("Done");
        Console.ReadLine();
    }
    

    【讨论】:

    • 哇。它看起来真的很好,但我想知道如何使用 Rx 做同样的事情。提前致谢!
    【解决方案3】:

    你能看看这是否有帮助?

    var urls = new List<string>
            {
                @"http:\\www.amazon.com",
                @"http:\\www.bing.com",
                @"http:\\www.google.com",
                @"http:\\www.twitter.com",
                @"http:\\www.google.com"
            };
    
    var contents =
        urls
            .ToObservable()
            .SelectMany(uri =>
                Observable
                    .Using(
                        () => new System.Net.Http.HttpClient(),
                        client =>
                            client
                                .GetStringAsync(new Uri(uri, UriKind.Absolute))
                                .ToObservable()));
    

    【讨论】:

    • 抱歉,效果不好。一百个任务超时后取消:(
    • 你可以尝试使用EventLoopScheduler吗?
    • 谢谢。我已经尝试过了,它的行为是一样的。请看@Dorus 的答案,因为它很简单,而且可以按预期工作,没有太多麻烦。
    • @Enigmativity 你介意看看吗? stackoverflow.com/questions/37437657/…
    猜你喜欢
    • 2017-06-13
    • 2012-02-27
    • 1970-01-01
    • 2021-03-17
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-10-21
    相关资源
    最近更新 更多