【发布时间】:2012-11-22 10:00:57
【问题描述】:
当两个转换块都完成时,如何重写代码完成的代码?我认为完成意味着它被标记为完成并且“out queue”是空的?
public Test()
{
broadCastBlock = new BroadcastBlock<int>(i =>
{
return i;
});
transformBlock1 = new TransformBlock<int, string>(i =>
{
Console.WriteLine("1 input count: " + transformBlock1.InputCount);
Thread.Sleep(50);
return ("1_" + i);
});
transformBlock2 = new TransformBlock<int, string>(i =>
{
Console.WriteLine("2 input count: " + transformBlock1.InputCount);
Thread.Sleep(20);
return ("2_" + i);
});
processorBlock = new ActionBlock<string>(i =>
{
Console.WriteLine(i);
});
//Linking
broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true });
broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true });
transformBlock1.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
transformBlock2.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
}
public void Start()
{
const int numElements = 100;
for (int i = 1; i <= numElements; i++)
{
broadCastBlock.SendAsync(i);
}
//mark completion
broadCastBlock.Complete();
processorBlock.Completion.Wait();
Console.WriteLine("Finished");
Console.ReadLine();
}
}
我编辑了代码,为每个转换块添加了输入缓冲区计数。很明显,所有 100 个项目都流式传输到每个转换块。但是一旦其中一个变换块完成,处理器块就不再接受任何项目,而是不完整的变换块的输入缓冲区只是刷新输入缓冲区。
【问题讨论】:
-
请注意,转换块可能不会收到来自广播块的所有消息。他们只收到最新的消息。如果向广播块提供消息的速度快于转换块接收它们的速度,则转换块将丢失消息。另外,如果您想确保消息顺序等,您应该在
SendAsync(i)上await。 -
@urbanhusky 我不确定你关于
TransformBlock会错过消息的断言。文档声明BroadcastBlock保证它会在接受新项目之前传播到所有链接的目标。如果BoundedCapacity在目标上不受限制,那么目标TransformBlock将缓冲消息。现在,如果设置了BoundedCapacity,那么它将阻塞直到有容量为止,这可能会导致消息丢失,尤其是对于未等待的SendAsync。简而言之,我的理解是,如果不受限制,则不会丢弃任何消息。
标签: c# concurrency task-parallel-library tpl-dataflow