【发布时间】:2013-12-21 05:28:04
【问题描述】:
我正在努力控制 TPL 数据流中的数据流。我有一个非常快的生产者和一个非常慢的消费者。 (我的真实代码更复杂,但无论如何,这是一个很好的模型,它重现了问题。)
当我运行它时,代码开始消耗内存,就好像它已经过时了——并且生产者的输出队列尽可能快地填满。我真正希望看到的是生产者停止运行一段时间,直到消费者有机会要求它。根据我对文档的阅读,这是应该发生的:也就是说,我认为生产者会等到消费者有空间。
显然,情况并非如此。如何解决它,以免队列发疯?
using System;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using System.Threading;
namespace MemoryLeakTestCase
{
class Program
{
static void Main(string[] args)
{
var CreateData = new TransformManyBlock<int, string>(ignore =>
{
return Enumerable.Range(0, 1000 * 1000 * 1000).Select((s,i) => "Hello, World " + i);
});
var ParseFile = new TransformManyBlock<string, string>(fileContent =>
{
Thread.Sleep(1000);
return Enumerable.Range(0, 100).Select((sst, iii) => "Hello, " + iii);
}, new ExecutionDataflowBlockOptions() { BoundedCapacity = 1000 }
);
var EndOfTheLine = new ActionBlock<object>(f =>
{
});
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true, };
CreateData.LinkTo(ParseFile, linkOptions);
ParseFile.LinkTo(EndOfTheLine, linkOptions);
Task t = new Task(() =>
{
while (true)
{
Console.WriteLine("CreateData: " + Report(CreateData));
Console.WriteLine("ParseData: " + Report(ParseFile));
Console.WriteLine("NullTarget: " + EndOfTheLine.InputCount );
Thread.Sleep(1000);
}
});
t.Start();
CreateData.SendAsync(0);
CreateData.Complete();
EndOfTheLine.Completion.Wait();
}
public static string Report<T, U>(TransformManyBlock<T, U> block)
{
return String.Format("INPUT: {0} OUTPUT: {1} ", block.InputCount.ToString().PadLeft(10, ' '), block.OutputCount.ToString().PadLeft(10, ' '));
}
}
}
【问题讨论】:
-
您是否尝试过将
ParseFile上的BoundedCapacity降低到小一点的值,比如 10 以确保它正常工作? -
路易斯,好主意。我试过了;它使
CreateData上的队列更大。我在这里试图解决的主要挑战是如何确保CreateData在ParseFile队列太大(或者它自己的输出队列太大)时暂停。 -
对于输出有界并支持所有
ExecutionDataflowBlockOptions的替代TransformManyBlock实现,请查看here。
标签: c# asynchronous task-parallel-library tpl-dataflow