【问题标题】:TPL Dataflow - Conditional loopsTPL 数据流 - 条件循环
【发布时间】:2016-12-16 09:27:18
【问题描述】:

目前我正在处理管道数据流,其中除了第 1 阶段之外的每个阶段都是async 正在运行的消费者和生产者。我有对象“流动”通过我的管道,这些对象引用了项目。在第 3 阶段,我想创建一个循环并缓冲所有满足特殊条件的对象(阶段循环)。

如果新对象进入(第 3 阶段)而当前有其他对象缓冲(阶段循环),我想检查它们是否在其引用项中匹配,如果匹配,则将它们发布到阶段循环的 BufferBlock

问题是,如何从 Stage 3 中检查 Stage Loop 中所有对象的引用项?

管道有点像这样:

Incoming objects ->  
  BufferBlock1 -> Parsing (Stage2) ->  
  BufferBlock2 -> Processing (Stage3) ->
  BufferBlock3 -> Stage Loop ->  
    Back to BufferBlock 2

【问题讨论】:

标签: c# .net task-parallel-library pipeline tpl-dataflow


【解决方案1】:

您的链中确实不需要那么多BufferBlockTPL Dataflow 包含一个 TransformBlock,它封装了 BufferBloсkActionBlock 逻辑,并有一个用于处理消息的输出块。

至于循环,您可以将块相互链接with static extension method,所以这可能是这样的

stage2.LinkTo(stage3, CheckForExistingProcessing);
stage2.LinkTo(stage4);

Jere stage4 是未通过检查的消息队列,必须循环处理。您可以设置额外的ActionBlock,或者,也许,只需使用TransformBlock 将消息再次发送到适当的阶段。我认为您也可以引入重试检查,因为某些消息可能根本无法处理,因此有些原因。

另外,正如你所说,你有async 逻辑,你可能应该SendAsync 消息而不是Post 它们(你也可以使用CancellationToken 的重载):

// asynchronously wait for a sending with resending attempts
await stage1.SendAsync(m);
// asynchronously wait for a sending with resending attempts with possible cancellation
await stage2.SendAsync(m, token);

Post 方法是同步的,如果目标不接受消息,丢弃消息,比较 SendAsync 方法,即使目标现在不能接受它也会尝试传递消息。 p>

【讨论】:

  • 我想我会遇到时间问题,其中 object1 进入,将在 CheckForExistingProcessing 中被拒绝并被转发到 stage4,而 object2 进入并通过检查,因为两者的引用项对象被释放它对它的锁定。因此,在进行检查之前,我需要检查 stage4 中是否有引用相同项目的对象。我需要的是某种队列,它根据锁检查以及是否已经有相同项目的锁定项目来推迟对象),但保留对象直到锁再次可用。
  • 你可以在这一步尝试BlockingCollection。
  • 我能够消除管道中的循环和锁定,并且我已经为它创建了一个测试类。一个问题是,如果我需要按文件进入的顺序导入文件,并且我在“一次一个文件”的基础上创建了管道。如果我调用 SendAsync,我如何确保它会按照它进入的顺序进行处理?我可以在管道之前添加一个 BufferBlock,它的消费者确保保持订单,但是有没有更智能的方法?
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2016-05-31
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多