【发布时间】:2015-09-22 12:48:56
【问题描述】:
在我的生产者-消费者场景中,我有多个消费者,每个消费者都向外部硬件发送一个操作,这可能需要一些时间。我的管道看起来有点像这样:
BatchBlock --> TransformBlock --> BufferBlock --> (几个) ActionBlocks
我已将我的 ActionBlocks 的 BoundedCapacity 分配给 1。 理论上我想要的是,只有当我的一个 Actionblock 可用于操作时,我才想触发 Batchblock 将一组项目发送到 Transformblock。到那时,Batchblock 应该只保留缓冲元素,而不是将它们传递给 Transformblock。我的批量大小是可变的。由于 Batchsize 是强制性的,我确实对 BatchBlock 批量大小有一个非常高的上限,但是我真的不希望达到这个限制,我想根据执行上述任务的动作块的可用性来触发我的批次.
我在 Triggerbatch() 方法的帮助下实现了这一点。我将 Batchblock.Triggerbatch() 称为我的 ActionBlock 中的最后一个操作。然而有趣的是,经过几天的正常工作,管道已经大功告成。经过检查,我发现有时批处理块的输入是在 ActionBlock 完成其工作之后进入的。在这种情况下,ActionBlock 确实在其工作结束时调用了 Triggerbatch,但是由于此时 Batchblock 根本没有输入,所以对 TriggerBatch 的调用是没有结果的。一段时间后,当输入确实流入 Batchblock 时,就没有人可以调用 TriggerBatch 并重新启动 Pipeline。我一直在寻找可以检查 Batchblock 的输入缓冲区中是否确实存在某些内容的东西,但是没有这样的功能可用,我也找不到检查 TriggerBatch 是否有效的方法。
谁能建议我的问题的可能解决方案。不幸的是,使用 Timer 来触发批处理对我来说不是一个选择。除了流水线的开始,节流应该只受其中一个 ActionBlock 的可用性控制。
示例代码在这里:
static BatchBlock<int> _groupReadTags;
static void Main(string[] args)
{
_groupReadTags = new BatchBlock<int>(1000);
var bufferOptions = new DataflowBlockOptions{BoundedCapacity = 2};
BufferBlock<int> _frameBuffer = new BufferBlock<int>(bufferOptions);
var consumerOptions = new ExecutionDataflowBlockOptions { BoundedCapacity = 1};
int batchNo = 1;
TransformBlock<int[], int> _workingBlock = new TransformBlock<int[], int>(list =>
{
Console.WriteLine("\n\nWorking on Batch Number {0}", batchNo);
//_groupReadTags.TriggerBatch();
int sum = 0;
foreach (int item in list)
{
Console.WriteLine("Elements in batch {0} :: {1}", batchNo, item);
sum += item;
}
batchNo++;
return sum;
});
ActionBlock<int> _worker1 = new ActionBlock<int>(async x =>
{
Console.WriteLine("Number from ONE :{0}",x);
await Task.Delay(500);
Console.WriteLine("BatchBlock Output Count : {0}", _groupReadTags.OutputCount);
_groupReadTags.TriggerBatch();
},consumerOptions);
ActionBlock<int> _worker2 = new ActionBlock<int>(async x =>
{
Console.WriteLine("Number from TWO :{0}", x);
await Task.Delay(2000);
_groupReadTags.TriggerBatch();
}, consumerOptions);
_groupReadTags.LinkTo(_workingBlock);
_workingBlock.LinkTo(_frameBuffer);
_frameBuffer.LinkTo(_worker1);
_frameBuffer.LinkTo(_worker2);
_groupReadTags.Post(10);
_groupReadTags.Post(20);
_groupReadTags.TriggerBatch();
Task postingTask = new Task(() => PostStuff());
postingTask.Start();
Console.ReadLine();
}
static void PostStuff()
{
for (int i = 0; i < 10; i++)
{
_groupReadTags.Post(i);
Thread.Sleep(100);
}
Parallel.Invoke(
() => _groupReadTags.Post(100),
() => _groupReadTags.Post(200),
() => _groupReadTags.Post(300),
() => _groupReadTags.Post(400),
() => _groupReadTags.Post(500),
() => _groupReadTags.Post(600),
() => _groupReadTags.Post(700),
() => _groupReadTags.Post(800)
);
}
【问题讨论】:
-
限制是通过设置输入、输出和链接选项的适当限制来实现的,例如通过设置DataflowLinkOptions.MaxMessages属性。但是,发布的代码甚至不会传播完成 -
LinkTo(source,target)重载不会传播完成。您需要使用接受链接选项和过滤谓词的the overload -
@PanagiotisKanavos max messages 与节流无关。
-
@i3arnon 我只是指出应该使用数据流的机制来处理节流,而不是尝试从头开始模拟它们。从问题 why 设置边界不起作用并且代码没有帮助 - 完成没有传播并且唯一的边界等于 1
-
@PanagiotisKanavos 感谢您的评论。我真的不希望传播完成,因为这段代码实际上应该一直运行,如果我传播完成,我将不得不重新初始化我所有的 TPL 块,我不想这样做。另外,我没有过滤谓词,因为我不希望有条件地传播我的输出。这里的问题是另外一回事。我希望使用 TriggerBatch() 方法,一切都可以正常工作,除了我上面概述的情况,即在调用所有 TriggerBatch() 方法之后进入 Batchblock 的输入。
-
重新表述了这个问题,以免造成任何混淆。
标签: c# pipeline throttling tpl-dataflow