【问题标题】:TPL Dataflow : How to throttle an entire pipeline?TPL 数据流:如何限制整个管道?
【发布时间】:2015-03-03 17:17:09
【问题描述】:

我想限制在 Dataflow 管道中发布的项目数量。项目的数量取决于生产环境。 这些对象消耗大量内存(图像),所以我想在管道的最后一个块完成其工作时发布它们。

我尝试使用SemaphoreSlim 来限制生产者并将其释放到管道的最后一个块中。可以,但是如果过程中出现异常,程序会一直等待,异常不会被拦截。

这是一个看起来像我们的代码的示例。 我怎样才能做到这一点 ?

static void Main(string[] args)
{
    SemaphoreSlim semaphore = new SemaphoreSlim(1, 2);

    var downloadString = new TransformBlock<string, string>(uri =>
    {
        Console.WriteLine("Downloading '{0}'...", uri);
        return new WebClient().DownloadString(uri);
    });

    var createWordList = new TransformBlock<string, string[]>(text =>
    {
        Console.WriteLine("Creating word list...");

        char[] tokens = text.ToArray();
        for (int i = 0; i < tokens.Length; i++)
        {
            if (!char.IsLetter(tokens[i]))
                tokens[i] = ' ';
        }
        text = new string(tokens);

        return text.Split(new char[] { ' ' },
           StringSplitOptions.RemoveEmptyEntries);
    });

    var filterWordList = new TransformBlock<string[], string[]>(words =>
    {
        Console.WriteLine("Filtering word list...");
        throw new InvalidOperationException("ouch !"); // explicit for test
        return words.Where(word => word.Length > 3).OrderBy(word => word)
           .Distinct().ToArray();
    });

    var findPalindromes = new TransformBlock<string[], string[]>(words =>
    {
        Console.WriteLine("Finding palindromes...");

        var palindromes = new ConcurrentQueue<string>();

        Parallel.ForEach(words, word =>
        {
            string reverse = new string(word.Reverse().ToArray());

            if (Array.BinarySearch<string>(words, reverse) >= 0 &&
                word != reverse)
            {
                palindromes.Enqueue(word);
            }
        });

        return palindromes.ToArray();
    });

    var printPalindrome = new ActionBlock<string[]>(palindromes =>
    {
        try
        {
            foreach (string palindrome in palindromes)
            {
                Console.WriteLine("Found palindrome {0}/{1}",
                   palindrome, new string(palindrome.Reverse().ToArray()));
            }
        }
        finally
        {
            semaphore.Release();
        }
    });

    downloadString.LinkTo(createWordList);
    createWordList.LinkTo(filterWordList);
    filterWordList.LinkTo(findPalindromes);
    findPalindromes.LinkTo(printPalindrome);


    downloadString.Completion.ContinueWith(t =>
    {
        if (t.IsFaulted) 
            ((IDataflowBlock)createWordList).Fault(t.Exception);
        else createWordList.Complete();
    });
    createWordList.Completion.ContinueWith(t =>
    {
        if (t.IsFaulted) 
            ((IDataflowBlock)filterWordList).Fault(t.Exception);
        else filterWordList.Complete();
    });
    filterWordList.Completion.ContinueWith(t =>
    {
        if (t.IsFaulted) 
            ((IDataflowBlock)findPalindromes).Fault(t.Exception); // enter here when an exception throws
        else findPalindromes.Complete();
    });
    findPalindromes.Completion.ContinueWith(t =>
    {
        if (t.IsFaulted)
            ((IDataflowBlock)printPalindrome).Fault(t.Exception); // the fault is propagated here but not catched
        else printPalindrome.Complete();
    });

    try
    {
        for (int i = 0; i < 10; i++)
        {
            Console.WriteLine(i);

            downloadString.Post("http://www.google.com");
            semaphore.Wait(); // waits here when an exception throws
        }

        downloadString.Complete();

        printPalindrome.Completion.Wait();
    }
    catch (AggregateException agg)
    {
        Console.WriteLine("An error has occured : " + agg);
    }
    Console.WriteLine("Done");
    Console.ReadKey();
}

【问题讨论】:

    标签: c# .net task-parallel-library throttling tpl-dataflow


    【解决方案1】:

    您应该简单地同时等待信号量和完成任务。这样,如果块过早结束(通过异常或取消),那么异常将被重新抛出,如果没有,那么您将等待您的信号量,直到有空间发布更多信息。

    您可以使用 Task.WhenAnySemaphoreSlim.WaitAsync 做到这一点:

    for (int i = 0; i < 10; i++)
    {
        Console.WriteLine(i);
        downloadString.Post("http://www.google.com");
    
        if (printPalindrome.Completion.IsCompleted)
        {
            break;
        }
    
        Task.WhenAny(semaphore.WaitAsync(), printPalindrome.Completion).Wait();
    }
    

    注意:使用Task.Wait 仅适用于这种情况,因为它是Main。通常这应该是一个async 方法,你应该awaitTask.WhenAny 返回的任务。

    【讨论】:

    • 谢谢,这部分效果很好。但是,循环继续向两个未标记为故障的第一块生成项目。如果我修改这部分代码:findPalindromes.Completion.ContinueWith(t =&gt; { if (t.IsFaulted) { ((IDataflowBlock)printPalindrome).Fault(t.Exception); ((IDataflowBlock)downloadString).Fault(t.Exception); //mark the first block has faulted } else printPalindrome.Complete(); }); 它可以工作。但我不确定这是更好的方法。
    • 不会走这条路线只是让代码同步运行,因为它只是告诉主线程等待吗?
    • @n3bula 你可以只检查完成任务是否完成。看看我的更新。
    • @moarboilerplate 是的。主线程将同步运行和等待。
    • @i3arnon 好的,此解决方案停止生产者。有没有办法阻止生产者并防止第一个块消耗他们的缓冲区?要重现此问题,请像这样初始化信号量SemaphoreSlim semaphore = new SemaphoreSlim(4, 5);
    【解决方案2】:

    这就是我在任何时候处理限制或只允许源块中的 10 个项目的方式。您可以将其修改为 1。确保您还限制了管道中的任何其他块,否则,您可以获得 1 的源块和更多的下一个块。

    var sourceBlock = new BufferBlock<string>(
        new ExecutionDataflowBlockOptions() { 
            SingleProducerConstrained = true, 
            BoundedCapacity = 10 });
    

    然后生产者这样做:

    sourceBlock.SendAsync("value", shutdownToken).Wait(shutdownToken);
    

    如果您使用的是 async / await,只需等待 SendAsync 调用。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-08-23
      相关资源
      最近更新 更多