【问题标题】:Avoid shutting down an entire data flow network when one block is faulted避免在一个块出现故障时关闭整个数据流网络
【发布时间】:2017-11-08 03:36:54
【问题描述】:

我正在使用DataFlowEx,我想知道如何避免在引发异常时关闭整个 DataFlow。

我有一个系统,任务会随机进入,我希望网络记录故障,放弃该特定任务并继续执行其他任务。

在阅读有关 TPL 和 DataFlowEx 的文档时,特别是像

它[一个故障块]应该拒绝任何进一步的传入消息。 Here

DataflowEx 在异常处理上采用快速失败的方法,就像 TPL 数据流。当抛出异常时,低级块结束 首先是故障状态。然后是父级的 Dataflow 实例 失败块的通知。它会立即传播 致命错误:通知其其他孩子立即关闭。后 它所有的孩子都完成/完成,父数据流也来到 它的完成,原始异常包含在 CompletionTask 的状态也是 Faulted。 Here

看起来好像不是从失败中继续前进的块......

我的流程包含大量文件 IO,我预计偶尔会发生异常(网络卷在读/写期间脱机、连接失败、权限问题...)

我不希望整个管道都死掉。

这是我正在使用的代码示例:

using Gridsum.DataflowEx;
using System;
using System.IO;
using System.Threading.Tasks.Dataflow;

namespace DataManagementSystem.Data.Pipeline.Actions
{
    class CopyFlow : Dataflow<FileInfo, FileInfo>
    {
        private TransformBlock<FileInfo, FileInfo> Copier;
        private string destination;

        public CopyFlow(string destination) : base(DataflowOptions.Default)
        {
            this.destination = destination;

            Copier = new TransformBlock<FileInfo, FileInfo>(f => Copy(f));

            RegisterChild(Copier);            
        }

        public override ITargetBlock<FileInfo> InputBlock { get { return Copier; } }

        public override ISourceBlock<FileInfo> OutputBlock { get { return Copier; } }

        protected virtual FileInfo Copy(FileInfo file)
        {
            try
            {
                return file.CopyTo(Path.Combine(destination, file.Name));
            }
            catch(Exception ex)
            {
                //Log the exception
                //Abandon this unit of work
                //resume processing subsequent units of work
            }

        }
    }
}

这是我将工作发送到管道的方式:

var result = pipeline.ProcessAsync(new[] { file1, file2 }).Result;

【问题讨论】:

    标签: c# tpl-dataflow


    【解决方案1】:

    如果一个块抛出一个Exception,它就会出错。如果您不希望管道失败,您可以不传播完成或处理Exception。处理异常可以采取多种形式,但听起来您只需要一次简单的重试。您可以使用try/catch 并实现自己的重试循环或使用Polly 之类的东西。一个简单的例子如下所示。

    public BuildPipeline() {
        var waitTime = TimeSpan.FromSeconds(1);
        var retryPolicy = Policy.Handle<IOException>()
                                .WaitAndRetryAsync(3, i => waitTime);
        var fileIOBlock = new ActionBlock<string>(async fileName => await retryPolicy.ExecuteAsync(async () => await FileIOAsync(fileName)));
    }
    

    注意:此代码未经测试,但应该让您朝着正确的方向前进。

    编辑

    您几乎拥有所需的一切。捕获异常并将其记录后,您可以返回 null 或其他一些标记,您可以将其从管道中过滤到 NullTarget。此代码确保NullTarget 过滤链接是Copier 上的第一个链接,因此任何空值都不会到达您的实际目的地。

    class CopyFlow : Dataflow<FileInfo, FileInfo> {
        private TransformBlock<FileInfo, FileInfo> Copier;
        private string destination;
    
        public CopyFlow(string destination) : base(DataflowOptions.Default) {
            this.destination = destination;
    
            Copier = new TransformBlock<FileInfo, FileInfo>(f => Copy(f));
            Copier.LinkTo(DataflowBlock.NullTarget<FileInfo>(), info => info == null);
    
            RegisterChild(Copier);
        }
    
        public override ITargetBlock<FileInfo> InputBlock { get { return Copier; } }
    
        public override ISourceBlock<FileInfo> OutputBlock { get { return Copier; } }
    
        protected virtual FileInfo Copy(FileInfo file) {
            try {
                return file.CopyTo(Path.Combine(destination, file.Name));
            } catch(Exception ex) {
                //Log the exception
                //Abandon this unit of work
                //resume processing subsequent units of work
                return null;
            }
    
        }
    }
    

    【讨论】:

    • 感谢您的回答。你的说法很有趣,特别是关于“处理异常”。说而不是重试,我只想记录失败并完全停止该工作单元。具体来说,我的问题是,一旦在 actually 工作的函数内部,我无法弄清楚如何:捕获异常 -> 记录错误 -> 放弃该工作单元 -> 继续如果什么都没发生。问题是函数调用者期望函数返回后返回FileInfo
    • 我现在明白了。我的困惑源于我没有完全理解NullTarget 的使用。我会仔细阅读它,非常感谢!附言波莉看起来非常出色!我将立即开始使用,这正是我所需要的(并且会自己实现,相当粗糙)。
    猜你喜欢
    • 2019-09-20
    • 2020-01-21
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多