【问题标题】:Conditional multiple LinkTo in TPLTPL 中的条件多个 LinkTo
【发布时间】:2022-01-28 21:44:43
【问题描述】:

我有一个链接到多个动作块的缓冲块:

bufferBlock.LinkTo(consumerActionBlock1, linkOptions);
bufferBlock.LinkTo(consumerActionBlock2, linkOptions);
bufferBlock.LinkTo(consumerActionBlockN, linkOptions);

效果是缓冲区块累积项目,一旦工人消费者可以接受更多项目,它就会获取它们(消费者一次设置为一个项目)。像这样,我有 N 个工作线程,而它们都在同一个队列上工作。到目前为止一切顺利。

现在我有一个管道,其中有两个缓冲区块,每个缓冲区都有特定优先级的工作项:

bufferBlockPrioHigh.LinkTo(consumerActionBlockHigh1, linkOptions);
bufferBlockPrioHigh.LinkTo(consumerActionBlockHigh2, linkOptions);
bufferBlockPrioHigh.LinkTo(consumerActionBlockHighN, linkOptions);

bufferBlockPrioLow.LinkTo(consumerActionBlockLow1, linkOptions);
bufferBlockPrioLow.LinkTo(consumerActionBlockLow2, linkOptions);
bufferBlockPrioLow.LinkTo(consumerActionBlockLowN, linkOptions);

也不错。现在我想实现这样的行为,如果低优先级队列中没有任何内容,那么高优先级队列可以使用它的工作人员。我添加了这段代码:

bufferBlockPrioHigh.LinkTo(consumerActionBlockLow1, linkOptions, c => bufferBlockPrioLow.Count == 0);
bufferBlockPrioHigh.LinkTo(consumerActionBlockLow2, linkOptions, c => bufferBlockPrioLow.Count == 0);
bufferBlockPrioHigh.LinkTo(consumerActionBlockLowN, linkOptions, c => bufferBlockPrioLow.Count == 0);

这个想法是高优先级缓冲区将首先尝试将项目发送到它自己的动作块。如果它们都忙,它会尝试将它们发送到低优先级缓冲区的动作块,因为此时低优先级缓冲区是空的。

这似乎有效。但是,如果所有的 LinkTo 都失败了,会发生什么? IE。没有自己的工作人员可用,并且所有有条件的 LinkTos 也会返回 false(即低优先级队列本身已满)? 整个任务现在是否会因为新的条件 LinkTos 而失败,或者一旦有条件或非条件的操作块空闲,任务最终会继续进行?

换句话说,我正在寻求了解何时评估多个 LinkTo 以及评估多少次。

【问题讨论】:

  • 为什么会有一系列consumerActionBlock1consumerActionBlock2 ... consumerActionBlockN,而不是一个consumerActionBlock配置MaxDegreeOfParallelism = N
  • 哪些块(如果有)配置了有限的BoundedCapacity
  • 所有消费者操作块的容量为 1。原因是为了控制确切的工人数量(以及背后的原因:资源。系统在重负载下变慢)。既然你这么说,可能只是设置 MaxDegreeOfParallelism 也一样
  • 我发现了一个可能相关的问题:TPL DataFlow, link blocks with priority?

标签: c# .net task-parallel-library tpl-dataflow


【解决方案1】:

好吧,如果有人会发现这个问题,我的发现:

这段代码

bufferBlockPrioHigh.LinkTo(consumerActionBlockHigh1, linkOptions);
bufferBlockPrioHigh.LinkTo(consumerActionBlockLow1, linkOptions, c => bufferBlockPrioLow.Count == 0);

没有意义,因为任务总是会流入consumerActionBlockHigh1,即使块已满。我最终完成了这个设置来实现我想要的:

bufferBlockPrioHigh.LinkTo(consumerActionBlockHigh1, linkOptions, bufferBlockPrioHigh.Count < capacity);
bufferBlockPrioHigh.LinkTo(consumerActionBlockLow1, linkOptions, c => bufferBlockPrioLow.Count == 0);
bufferBlockPrioHigh.LinkTo(consumerActionBlockHigh1, linkOptions);

这意味着:

  • 如果 bufferBlockPrioHigh 尚未满,立即使用 high prio worker
  • 尝试使用其他容量:否则,如果 bufferBlockPrioLow 为空,则使用其工作者
  • 否则使用 bufferBlockPrioHigh 工人

【讨论】:

  • 这很容易失败。没有什么可以保证缓冲区块从一次调用到下一次调用具有相同的计数。 LinkTo 中的谓词旨在过滤消息,而不是为此。没有什么会强迫您对所有消息使用单个 管道或进行直接连接。您可以使用构建器函数来创建使用不同参数或整个管道的块,例如具有一组 DOP、边界的高优先级和具有不同设置的低优先级
  • 但是我希望 Count() 每次都有不同的值:只有当计数很高(即队列已满)时,我才应该转发到低优先级队列。还是我错过了你的意思?我也想有几个管道,但我的问题是系统可以并行处理 10 个工作人员(总而言之)。如果我有一个 10 的高优先队列和 10 的低优先队列,系统会很慢。如果我同时拥有 5 个,并且 high prio 队列将满而 low prio 为空,high prio 将只有 5 个工人可供支配,而它本来可以有 10 个。
  • 换句话说,我想要一个管道,那里有 N 个工人。每个优先级队列至少有 M 个可供使用的工作人员 (M*numberOfPrios = N),但如果一个队列为空而另一个队列已满,则比满的队列将扩大并使用所有可用的 N 个工作人员。
  • Capacity includes the currently processed items(我不知道)。如果您有 DOP=Capacity==M*Priority 的块,您可以使用简单的LinkTo 来确保从客户端发布的消息将进入第一个可用管道。 LinkTo 连接按顺序评估
  • 这可能正是我不明白的重点。假设我在高优先级缓冲区中有 1000 个项目。通过我的回答中的链接,我的理解是它将填充 consumerActionBlockHigh1 直到达到其容量,然后对 consumerActionBlockLow1 执行相同操作(假设其缓冲区为空),然后将启动到 consumerActionBlockHigh1 的 3d LinkTo - 它将被评估, 消息将被链接,但会一直保留在缓冲区中,直到 ActionBlock 中有位置为止。
猜你喜欢
  • 2018-08-29
  • 2012-08-16
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-11-24
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多