【问题标题】:Wait for previous blocks to finish processing before continuing在继续之前等待先前的块完成处理
【发布时间】:2014-12-29 18:18:32
【问题描述】:

我有一个看起来像这样的过程。

  1. 从文件夹中获取一组 CSV 文件
  2. 读取 CSV 文件,并将内容存储在数据库中
  3. 从数据库中读取数据并执行更多处理。

将第 2 步和第 3 步分开的原因是将读取文件所涉及的问题与处理文件所涉及的问题分开。

我可以用三个数据流块对此进行建模。我遇到的问题是,在所有文件都保存到数据库之前,我不希望块 3 启动。我需要某种方法来确定在块 1 中提取的所有文件都已由块 2 处理。块 2 将其 MaxDegreeOfParallelism 设置为 Unbounded - 我希望它们并行处理。

我考虑在前两个街区使用Encapsulate,但我认为这行不通。也许我需要某种Batchblock,但批次的大小并不相同。

我该怎么做?我需要创建自己的块类型吗?

【问题讨论】:

  • 是否必须同时从数据库中读取所有数据,或者您可以逐个文件读取它们,或者类似的方式?

标签: c# .net task-parallel-library tpl-dataflow


【解决方案1】:

这不适合单个 TDF 流,因为第 2 步不会将项目传递到第 3 步,第 3 步是在前一个已经完成之后开始的。

您应该有 2 个独立的流程。第一个从文件夹中读取并存储在数据库中,第二个从数据库中读取并开始处理。您可以通过等待Completion 属性来等待第一个流程完成:

var reader = // Create #1 block
var dbFiller = // Create #2 block

reader.LinkTo(dbFiller, new DataflowLinkOptions { PropagateCompletion = true }); // Link both blocks with Completion Propagation

reader.Post( // Queue up work for reader

await reader.Completion; // Asynchronously wait for previous steps to complete

var processor = // Create #3 block

processor.Post( // Queue up work for processor

【讨论】:

  • 不过,要这样做,我需要 Complete() 读者,不是吗?上面的代码中缺少哪个。我不想这样做,因为这是一个可重复的过程,我想重用管道。
  • @user1158174 但您说您希望仅在前一个完成后才开始处理。如果没有完成,你的意思是完成...
  • 是的。我想这确实是个问题,不是吗?步骤 1 将每隔十分钟运行一次。当它执行时,它可能会找到十个文件。我希望在第 3 步开始之前通过第 2 步处理所有这些。所以当我说完成时,我的意思是“完成了当前批次的文件”。这就是我卡住的地方,因为 Dataflow 不提供开箱即用的功能。 BatchBlock 是最接近的,但它需要特定大小的批次,这不是我所拥有的。
  • @user1158174 TDF 根本不提供。我建议您使用我的答案..但每 10 分钟调用一次并等待整个周期完成。然后等待 10 分钟再做一次.. 等等。
  • 好的,谢谢。我想我希望有一种方法可以将该行为封装在自定义块或其他东西中。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2015-09-16
  • 1970-01-01
  • 1970-01-01
  • 2020-02-24
相关资源
最近更新 更多