【问题标题】:How do I arrange flow control in TPL Dataflows?如何在 TPL 数据流中安排流量控制?
【发布时间】:2013-12-21 05:28:04
【问题描述】:

我正在努力控制 TPL 数据流中的数据流。我有一个非常快的生产者和一个非常慢的消费者。 (我的真实代码更复杂,但无论如何,这是一个很好的模型,它重现了问题。)

当我运行它时,代码开始消耗内存,就好像它已经过时了——并且生产者的输出队列尽可能快地填满。我真正希望看到的是生产者停止运行一段时间,直到消费者有机会要求它。根据我对文档的阅读,这是应该发生的:也就是说,我认为生产者会等到消费者有空间。

显然,情况并非如此。如何解决它,以免队列发疯?

using System;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using System.Threading;

namespace MemoryLeakTestCase
{
    class Program
    {

        static void Main(string[] args)
        {
            var CreateData = new TransformManyBlock<int, string>(ignore =>
            {
                return Enumerable.Range(0, 1000 * 1000 * 1000).Select((s,i) => "Hello, World " + i);
            });

            var ParseFile = new TransformManyBlock<string, string>(fileContent =>
            {
                Thread.Sleep(1000);
                return Enumerable.Range(0, 100).Select((sst, iii) => "Hello, " + iii);
            }, new ExecutionDataflowBlockOptions() { BoundedCapacity = 1000 }
            );

            var EndOfTheLine = new ActionBlock<object>(f =>
                {
                });


            var linkOptions = new DataflowLinkOptions { PropagateCompletion = true, };
            CreateData.LinkTo(ParseFile, linkOptions);
            ParseFile.LinkTo(EndOfTheLine, linkOptions);

            Task t = new Task(() =>
            {
                while (true)
                {
                    Console.WriteLine("CreateData: " + Report(CreateData));
                    Console.WriteLine("ParseData:  " + Report(ParseFile));
                    Console.WriteLine("NullTarget: " +  EndOfTheLine.InputCount );
                    Thread.Sleep(1000);
                }

            });
            t.Start();

            CreateData.SendAsync(0);
            CreateData.Complete();

            EndOfTheLine.Completion.Wait();
        }

        public static string Report<T, U>(TransformManyBlock<T, U> block)
        {
            return String.Format("INPUT: {0}   OUTPUT: {1} ", block.InputCount.ToString().PadLeft(10, ' '), block.OutputCount.ToString().PadLeft(10, ' '));
        }


    }
}

【问题讨论】:

  • 您是否尝试过将 ParseFile 上的 BoundedCapacity 降低到小一点的值,比如 10 以确保它正常工作?
  • 路易斯,好主意。我试过了;它使CreateData 上的队列更大。我在这里试图解决的主要挑战是如何确保CreateDataParseFile 队列太大(或者它自己的输出队列太大)时暂停。
  • 对于输出有界并支持所有ExecutionDataflowBlockOptions 的替代TransformManyBlock 实现,请查看here

标签: c# asynchronous task-parallel-library tpl-dataflow


【解决方案1】:

通常情况下,在这种情况下,您还需要设置 CreateData 块的 BoundedCapacity。但这在这里行不通,因为TransformManyBlock 在从单个IEnumerable 填充输出队列时似乎没有考虑BoundedCapacity

您可以做的是创建一个函数来迭代集合并使用SendAsync() 仅在目标可以接受时发送更多数据:

/// <remarks>
/// If iterating data throws an exception, the target block is faulted
/// and the returned Task completes successfully.
/// 
/// Depending on the usage, this might or might not be what you want.
/// </remarks>
public static async Task SendAllAsync<T>(
    this ITargetBlock<T> target, IEnumerable<T> data)
{
    try
    {
        foreach (var item in data)
        {
            await target.SendAsync(item);
        }
    }
    catch (Exception e)
    {
        target.Fault(e);
    }
}

用法:

var data = Enumerable.Range(0, 1000 * 1000 * 1000).Select((s,i) => "Hello, World " + i);
await ParseFile.SendAllAsync(data);
ParseFile.Complete();

如果您仍然希望 CreateData 块的行为与原始代码相似,您可以在它们之间设置两个有界的 BufferBlocks、SendAllAsync(),然后使用 Encapsulate() 使它们看起来像一个块:

/// <remarks>
/// boundedCapacity represents the capacity of the input queue
/// and the output queue separately, not their total.
/// </remarks>
public static IPropagatorBlock<TInput, TOutput>
    CreateBoundedTransformManyBlock<TInput, TOutput>(
    Func<TInput, IEnumerable<TOutput>> transform, int boundedCapacity)
{
    var input = new BufferBlock<TInput>(
        new DataflowBlockOptions { BoundedCapacity = boundedCapacity });
    var output = new BufferBlock<TOutput>(
        new DataflowBlockOptions { BoundedCapacity = boundedCapacity });

    Task.Run(
        async () =>
        {
            try
            {
                while (await input.OutputAvailableAsync())
                {
                    var data = transform(await input.ReceiveAsync());

                    await output.SendAllAsync(data);
                }

                output.Complete();
            }
            catch (Exception e)
            {
                ((IDataflowBlock)input).Fault(e);
                ((IDataflowBlock)output).Fault(e);
            }
        });

    return DataflowBlock.Encapsulate(input, output);
}

【讨论】:

  • 感谢您的回复!我认为,这似乎解决了眼前的问题,尽管我仍然对您的代码感到困惑。特别是,这个模型使事情在批处理模式下发生——我得到一个boundedCapacity 价值的数据一次,然后它倒出来。然后是一个新的boundedCapacity。等等。我宁愿看看,如果可能的话,每个单元都是单独发送的。
  • 并且,在@svick 的评论中表达一点:事实证明,TransformMany 只检查输入条目之间的队列;为了使这个场景工作,我需要检查一个条目。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多