【发布时间】:2020-09-09 02:01:24
【问题描述】:
典型情况:生产者快,消费者慢,需要让生产者慢下来。
无法按预期工作的示例代码(如下所述):
// I assumed this block will behave like BlockingCollection, but it doesn't
var bb = new BufferBlock<int>(new DataflowBlockOptions {
BoundedCapacity = 3, // looks like this does nothing
});
// fast producer
int dataSource = -1;
var producer = Task.Run(() => {
while (dataSource < 10) {
var message = ++dataSource;
bb.Post(message);
Console.WriteLine($"Posted: {message}");
}
Console.WriteLine("Calling .Complete() on buffer block");
bb.Complete();
});
// slow consumer
var ab = new ActionBlock<int>(i => {
Thread.Sleep(500);
Console.WriteLine($"Received: {i}");
}, new ExecutionDataflowBlockOptions {
MaxDegreeOfParallelism = 2,
});
bb.LinkTo(ab);
ab.Completion.Wait();
我认为这段代码会起作用,但它不起作用:
-
BufferBlockbb是容量为 3 的阻塞队列。一旦达到容量,生产者应该无法.Post()到它,直到有一个空槽。- 不能那样工作。
bb似乎很乐意接受任意数量的消息。
- 不能那样工作。
-
producer是一个快速发布消息的任务。发布所有消息后,对bb.Complete()的调用应通过管道传播,并在处理完所有消息后发出关闭信号。因此在最后等待ab.Completion.Wait();。- 也不起作用。只要调用了
.Complete(),动作块ab就不会再收到任何消息。
- 也不起作用。只要调用了
可以使用BlockingCollection 来完成,我认为在 TPL 数据流 (TDF) 世界中 BufferBlock 相当于。我想我误解了背压在 TPL 数据流中应该如何工作。
那么问题在哪里?如何运行此管道,不允许缓冲区bb 中的消息超过 3 条,并等待其完成?
PS:我发现了这个要点 (https://gist.github.com/mnadel/df2ec09fe7eae9ba8938),建议在其中维护一个信号量来阻止写入 BufferBlock。我认为这是“内置”的。
接受答案后更新:
接受答案后更新:
如果你正在看这个问题,你需要记住ActionBlock 也有自己的输入缓冲区。
这是一个。然后您还需要意识到,因为所有块都有自己的输入缓冲区,所以您不需要BufferBlock,因为您可能认为它的名字暗示了它。 BufferBlock 更像是用于更复杂架构的实用程序块或平衡加载块。但它不是背压缓冲区。
完成传播需要在链接级别明确定义。
调用.LinkTo() 时需要显式传递new DataflowLinkOptions {PropagateCompletion = true} 作为第二个参数。
【问题讨论】:
-
尝试使用SendAsync 代替
Post?它阻塞直到有空闲空间 -
是的,你想要的是
await someBlock.SendAsync(item),它将等待块队列中的空闲空间。Post在无法插入项目时只会返回 false。 -
您应该发布您的更新作为答案并接受它,因为它是真正的解决方案(我将发布您所说的内容)。如果你这样做,我会投票赞成。
-
@ScottChamberlain 感谢您的建议,有点懒惰。顺便说一句,如果您有比我现在发布的更好的示例,请在此处发布,我很乐意更改已接受的答案!
标签: c# task-parallel-library dataflow tpl-dataflow