【问题标题】:DataflowEx never completesDataflowEx 永远不会完成
【发布时间】:2017-12-27 03:24:44
【问题描述】:

我正在尝试将开源库 DataflowEx 与下一个 Dataflow 声明一起使用。

class RequestClientFlow :Dataflow<string>{
    private readonly ILogger _logger;
    private readonly Dataflow<string, WebProxy> _webproxyDataflow;
    private readonly Dataflow<WebProxy, HttpClient> _httpClientDataflow;

    public RequestClientFlow(ILogger logger) : this(DataflowOptions.Default){
        _logger = logger;
    }

    public Dataflow<WebProxy, HttpClient> HttpClientDataflow => _httpClientDataflow;

    public RequestClientFlow(DataflowOptions dataflowOptions) : base(dataflowOptions){
        _webproxyDataflow = new TransformBlock<string,WebProxy>(s => {
            _logger.WriteLine("aaaa");
            return new WebProxy();
        }).ToDataflow();
        _httpClientDataflow = new TransformBlock<WebProxy,HttpClient>(proxy => {
            _logger.WriteLine("bbbb");
            return new HttpClient();
        }).ToDataflow();
        _webproxyDataflow.LinkTo(_httpClientDataflow);
        RegisterChild(_webproxyDataflow);
        RegisterChild(_httpClientDataflow);
    }

    public override ITargetBlock<string> InputBlock => _webproxyDataflow.InputBlock;
}

当我像消费它时

var requestClientFlow = new RequestClientFlow(this);
requestClientFlow.Post("");
requestClientFlow.Complete();
await requestClientFlow.InputBlock.Completion;

完成并显示我的输出

18:32:54.3773|aaaa 18:32:54.3773|bbbb

1 次通过,0 次失败,0 次跳过,耗时 1.45 秒(xUnit.net 2.3.1 构建 3858)。

但是我的理解来自我也应该能够使用的框架文档

    requestClientFlow.Complete();
    await requestClientFlow.CompletionTask;

甚至

await requestClientFlow.SignalAndWaitForCompletionAsync();

它没有完成。有人可以帮我理解我做错了什么吗?

【问题讨论】:

    标签: c# tpl-dataflow


    【解决方案1】:

    您的流程无法完成,因为最后一个块是 TransformBlock。在您的第一个示例中,您 await 完成了实际上完成的 Input 块。 Output 块无法完成,因为其输出缓冲区中的项目无处可去。 DataflowEx 库在流的最后一个块上是正确的 awaiting。您可以在末尾添加ActionBlockNullTarget 以实现补全。

    DataflowEx而言,最终流程应该正在实现

    public interface IDataflow<in TIn> : IDataflow
    {
        ITargetBlock<TIn> InputBlock { get; }
    }
    

    正如图书馆 github 页面上的示例所示:

    public class AggregatorFlow : Dataflow<string>
    {
        //...//
    
        public AggregatorFlow() : base(DataflowOptions.Default)
        {
            _splitter = new TransformBlock<string, KeyValuePair<string, int>>(s => this.Split(s));
            _dict = new Dictionary<string, int>();
    
            //***Note The ActionBlock here***
            _aggregater = new ActionBlock<KeyValuePair<string, int>>(p => this.Aggregate(p));
    
            //Block linking
            _splitter.LinkTo(_aggregater, new DataflowLinkOptions() { PropagateCompletion = true });
    
            /* IMPORTANT */
            RegisterChild(_splitter);
            RegisterChild(_aggregater);
        }
    
        //...//
    }
    

    【讨论】:

    • 嗯谢谢你的答案,但仍然很难得到它。使用 ActionBlock 不允许我将转换后的对象公开到另一个可重用的组件或数据流中,对吗?那么您能否演示一下 DataflowBlock.NullTarget 选项?
    • ActionBlockNullTarget 都不会让您将生成的 HttpClient 流到另一个流中。如果这是您的要求,那么您的流程需要是 Dataflow&lt;string, HttpClient&gt;。在您的测试中,您可以使用来自DataflowExLinkLeftToNull() 链接该流。见#5 Advanced Linking
    • 另请注意,该示例不会在底层块上调用ToDataflow(),而是使用DataflowLinkOptions() { PropagateCompletion = true }在内部块之间显式传播完成
    猜你喜欢
    • 1970-01-01
    • 2017-01-27
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-07-29
    • 2018-05-11
    • 1970-01-01
    相关资源
    最近更新 更多