【问题标题】:How to stop processing pipeline on faulty block?如何停止故障块上的处理管道?
【发布时间】:2016-07-18 04:36:19
【问题描述】:

如果其中一个块决定发生错误,我如何停止处理 DataFlow 块,从而阻止下一个块运行。我认为一个块可以引发异常,但不确定停止进一步处理管道的正确方法是什么。

更新:

private async void buttonDataFlow_Click(object sender, EventArgs e)
{
    var cells = objectListView.CheckedObjects.Cast<Cell>().ToList();
    if (cells == null)
        return;

    var blockPrepare = new TransformBlock<Cell, Cell>(new Func<Cell, Task<Cell>>(Prepare),
    new ExecutionDataflowBlockOptions
    {
        BoundedCapacity = 10000,
        MaxDegreeOfParallelism = Environment.ProcessorCount,
    });

    var blockPreparationFeedback = new TransformBlock<Cell, Cell>(new Func<Cell, Task<Cell>>(PreparationFeedback),
    new ExecutionDataflowBlockOptions
    {
        BoundedCapacity = 10000,
        MaxDegreeOfParallelism = Environment.ProcessorCount,
    });

    var blockTestMover = new ActionBlock<Cell>(new Func<Cell, Task>(TestMover),
    new ExecutionDataflowBlockOptions
    {
        BoundedCapacity = 10000,
        MaxDegreeOfParallelism = Environment.ProcessorCount,
    });

    blockPrepare.LinkTo(blockPreparationFeedback, new DataflowLinkOptions { PropagateCompletion = true });
    blockPreparationFeedback.LinkTo(blockTestMover, new DataflowLinkOptions { PropagateCompletion = true });

    foreach (Cell c in cells)
    {
        var progressHandler = new Progress<string>(value =>
        {
            c.Status = value;
        });

        c.Progress = progressHandler as IProgress<string>;
        blockPrepare.Post(c);
    };

    blockPrepare.Complete();
    try
    {
        await blockTestMover.Completion;
    }
    catch(Exception ee)
    {
        Console.WriteLine(ee.Message);
    }

    Console.WriteLine("Done");
}

更新 2:

    public ITargetBlock<TInput> CreateExceptionCatchingActionBlock<TInput>(
                    Func<TInput, Task> action,
                    Action<Exception> exceptionHandler,
                    ExecutionDataflowBlockOptions dataflowBlockOptions)
    {
        return new ActionBlock<TInput>(async input =>
        {
            try
            {
                await action(input);
            }
            catch (Exception ex)
            {
                exceptionHandler(ex);
            }
        }, dataflowBlockOptions);
    }

【问题讨论】:

  • 你可以看看 Stephen Cleary 的简约 Try 库。它允许通过管道的所有块传递消息,然后观察该消息最后发生的任何异常。

标签: c# async-await task-parallel-library tpl-dataflow


【解决方案1】:

如果您想要的是块中的异常意味着当前项目确实在管道中走得更远,但其他项目的处理应该继续而不会中断,那么您可以通过创建一个在处理时产生一个项目的块来做到这一点成功,但在抛出异常时产生零项:

public IPropagatorBlock<TInput, TOutput> CreateExceptionCatchingTransformBlock<TInput, TOutput>(
    Func<TInput, Task<TOutput>> transform,
    Action<Exception> exceptionHandler,
    ExecutionDataflowBlockOptions dataflowBlockOptions)
{
    return new TransformManyBlock<TInput, TOutput>(async input =>
    {
        try
        {
            var result = await transform(input);
            return new[] { result };
        }
        catch (Exception ex)
        {
            exceptionHandler(ex);

            return Enumerable.Empty<TOutput>();
        }
    }, dataflowBlockOptions);
}

【讨论】:

  • 这正是我想要的!
  • 只是最后一件事.. 虽然 ActionBlock 是最后一个块,但我尝试使用类似的方法来创建它。我的帖子已更新,试图创建这种方法。但是编译器抱怨不能等待'void'。我能理解他,但不知道如何解决。
  • 将要运行的操作返回Task
【解决方案2】:

如果您有管道,您可能已经在使用PropagateCompletion = true。这意味着如果管道中的一个块因异常而失败,那么它之后的所有块也会失败。

剩下的就是停止失败块之前的所有块。为此,您可以等待管道中最后一个块的Completion。如果这样做会引发错误,请在第一个块上调用 Fault() 使其失败。代码可能如下所示:

// set up your pipeline

try
{
    await lastBlock.Completion;
}
catch (Exception ex)
{
    ((IDataflowBlock)firstBlock).Fault(ex);

    throw; // or whatever is appropriate to propagate the exception up
}

【讨论】:

  • 这是取消一切,甚至是那些没有故障的工作。我的意思是我在 foreach 循环中将许多作业发布到管道中,如果其中一个有故障,它们都会被取消。前 8 项(8 项,因为我有 8 核 CPU)正在运行,其中一项没有继续,其余的正在完成。但除了这 8 个项目之外,其他项目不再处理。如果没有发生异常,则处理所有项目。我在帖子中更新了源代码。
  • @Pablo 我认为这就是您想要的,“停止[在]管道中的进一步处理”。
  • 我认为管道是当我发布以阻止带有一些输入数据的作业时,根据链接块,该作业正在传递到另一个块等等。如果我只发布一次,那么区块链就会按预期被取消。但是,如果我多次发布,那么所有“帖子”都会被取消,即使是那些毫无例外地运行的帖子。换句话说,如果我将输入数据 [1, 2, 3, 4] 发布到块链,并且仅当我发布 2 时出现异常,那么 1、3、4 将按预期完成工作。我可能使用了错误的术语,但我不确定如何更好地描述。
猜你喜欢
  • 2016-08-30
  • 2013-03-02
  • 2023-01-02
  • 1970-01-01
  • 1970-01-01
  • 2018-10-16
  • 1970-01-01
  • 1970-01-01
  • 2018-06-09
相关资源
最近更新 更多