【问题标题】:Most Optimal TPL Dataflow Design?最优化的 TPL 数据流设计?
【发布时间】:2012-06-28 09:55:24
【问题描述】:

我想请教如何使用 TPL 数据流设计最佳架构。我还没有编写代码,所以没有可以发布的示例代码。我也不是在寻找代码(除非自愿),但非常感谢设计方面的帮助:

要求如下:

我有 3 个以特定方式相互依赖的核心数据块。 Datablock1 是生产 Foo1 类型对象的生产者。 Datablock2 应该订阅 Foo1 对象(来自 Datablock1),并且可能(不是在每个 Foo1 上,受特定函数约束)生成 Foo2 对象,并将其存储在输出队列中以供其他数据块使用。 Datablock3 还使用 Foo1 对象(来自 Datablock1)并可能生成 Foo3 对象,Datablock2 使用这些对象并转换为 Foo2 对象。

总之,以下是数据块以及它们各自产生和消耗的内容:

  • Datablock1:生产(Foo1),消费(无)
  • Datablock2:生产(Foo2),消费(Foo1,Foo3)
  • Datablock3:生产(Foo3),消费(Foo1)

另一个要求是在 Datablock2 和 Datablock3 中几乎同时处理相同的 Foo1。如果 Foo1 对象首先由 Datablock2 使用,然后一旦 Datablock2 完成其工作,同样的 Foo1 对象将被发布到 Datablock3 以供其工作。 Datablock2 中的 Foo2 对象可以来自对 Foo1 对象或 Foo3 对象的操作。

我希望这是有道理的,如果仍然不清楚,我很乐意解释更多。

我的第一个想法是为 3 个数据块中的每一个创建 TPL 数据流块,并让它们处理不同对象类型的传入流。另一个想法是拆分数据块,让每个数据块只处理一种对象类型的流。您有什么建议,或者是否有更好的解决方案可能有效?

Svick 已经在 Datablock1 上提供了帮助,并且它已经投入使用,我只是纠结于如何将我当前的环境(如上所述)转换为 TPL 数据流。

非常感谢任何想法或建议。

【问题讨论】:

  • 如果您正在寻找最高效的架构,只有一个可以正确找到:测量不同的选项以找出哪个实际上是最好的。其他一切都只是猜测。
  • 完全同意,但在这一点上,我不确定数据块是否可以设置为接受来自与其链接的不同 ISourceBlocks 的对象。如果我知道如何编写一个可以采用例如 Foo1 和 Foo2 对象的数据块,将它们缓冲在 2 个单独的输入队列中并让传入的函数在其中一个上运行,那么我将能够自己设置一些测试架构。问题是 TPL 数据流文档目前几乎不存在。您能否帮助解决如何接受多个生产者将不同的对象传递给一个数据流块?
  • Foo1Foo3 是否共享一个公共基类?如果不是,为什么块 2 可以同时处理?
  • 他们不共享任何基类。关键是要弄清楚是否可以设计一个块来获取/处理两个不同的流(不同类型的对象)。这就是这个练习的重点,要弄清楚什么是可行的,什么是不可行的,因为 TPL 数据流文档没有太多可说的,至少我没有找到太多。我偶然发现了 BatchedJoinBlock 但不确定它是否适用于这里......
  • @svick,我的问题是数据流块是否可以设计为接受来自 2 个单独数据块的不同类型的流。我认为 TPL Dataflow 应该提供一种或另一种方式听起来像是一个合理的要求,但它只是文档太稀少了,以至于到目前为止我还没有看到任何提到可以自己处理多种数据类型的任何类型的块的任何内容相应的输入队列。

标签: c# architecture asynchronous concurrency tpl-dataflow


【解决方案1】:

让我们把这个问题一分为三,分别独立解决。

第一个是如何有条件地产生一个项目。我认为最好的选择是使用TransformManyBlock 并让您的函数返回一个包含一个或零个项目的集合。

另一个选项是link the two blocks conditionally,这样nulls 将被忽略并在您不想生成任何内容时返回null。 但是如果你这样做,你还必须将源链接到NullTarget,这样nulls 就不会留在它的输出缓冲区中。

第二个问题是如何将 Foo1s 发送到块 #2 和块 #3。我可以在这里看到两种方式:

  1. 使用链接到两个目标块(#2 和#3)的BroadcastBlock。请注意这一点,因为BroadcastBlock 没有输出队列,所以如果目标块推迟了一个项目,则意味着它不会处理它。因此,在这种情况下,您不应设置块#2 和#3 的BoundedCapacity。如果您不这样做,它们将永远不会推迟,所有消息都将由两个块处理。
  2. 在块 #2 处理 Foo1 后,手动将其 Post()(或更好,SendAsync())到块 #3。

我不确定“大约在同一时间”究竟是什么意思,但总的来说,TPL 数据流不对独立块的处理顺序做出任何保证。您可以使用a custom TaskScheduler 更改不同块的优先级,但我不确定这在这里是否有用。

最后一个也是最复杂的问题是如何在一个块中处理不同类型的项目。有几种方法可以做到这一点,但我不确定哪种方法最适合您:

  1. 不要在单个块中处理它们。有一个TransformBlock<Foo1, Foo2> 和一个TransformBlock<Foo3, Foo2>。然后,您可以将它们都链接到一个 BufferBlock<Foo2>
  2. 按照您的建议,使用BatchedJoinBlock<Foo1, Foo3>,将batchSize 设为1。这意味着生成的Tuple<IList<Foo1>, IList<Foo3>> 将始终包含一个Foo1 或一个Foo3
  3. 通过将BatchedJoinBlock 链接到产生更合适类型的TransformBlock 来增强以前的解决方案。这可以是Tuple<Foo1, Foo3>(其中一项将始终为null),也可以是类似F# Choice<Foo1, Foo3> 的东西,这样可以确保只设置两者之一。
  4. 从头开始创建一个新的块类型,这正是您想要的。它应该是ISourceBlock<Foo2>,并且还有两个属性:ITarget<Foo1> 类型的Target1ITarget<Foo3> 类型的Target2,就像内置的连接块一样。

使用选项 #1 和 #3,您还可以将块封装到单个自定义块中,从外部看起来像 #4 中的块,以便更容易重复使用。

【讨论】:

  • 我能澄清一下“有条件生产”的含义吗?这是必要的吗,因为 TransformBlocks 通常在输出队列中放置的项目数量与到达输入队列中的项目数量完全相同?在满足的条件下使用 Actionblock 和 Posting 会是一个肮脏的解决方案吗?
  • “大约在同一时间”我想指出这样一个事实,即由 DataBlock3 生成并由 DataBlock2 消耗的 Foo3 取决于 Foo1s 在 Datablock2 中设置的某些变量的状态.我认为解决此问题的唯一方法是合并数据块 2 和 3。或者为每种流类型使用不同的数据块。
  • 是的,有条件地生产意味着只有一些输入产生输出。 TransformBlock 生产的物品数量与消耗的物品数量相同,是的。一般来说,我认为链接比发布更可取,但在这种情况下,我想这是一个有效的选择。
  • 让一个块依赖于一些外部数据违背了 TPL 数据流的理念(以及它所建立的参与者模型)。如果某个块需要特定于某个项目的东西,它应该与该项目一起提供。正因为如此,手动Post() 可能是这里最好的选择,它将确保操作顺序正确。
  • 我选择了选项#1,它的代码更简洁,而且速度足够快。但是有一个问题:Actionblock 中是否有一种方法可以等待处理队列中的下一项,直到当前项不仅在操作块本身内部而且如果该项通过 Post(item) 传递到另一个数据块。是否有某种机制来报告当前项目已完全处理并指示 Actionblock 处理队列中的下一个项目?
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2013-03-29
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多