【问题标题】:Use TPL Dataflow to encapsulate pipeline ending in an action block使用 TPL Dataflow 封装以操作块结尾的管道
【发布时间】:2015-09-08 14:02:08
【问题描述】:

TPL Dataflow 提供了一个非常有用的功能:

public static IPropagatorBlock<TInput, TOutput> Encapsulate<TInput, TOutput>(
    ITargetBlock<TInput> target, 
    ISourceBlock<TOutput> source)

使您能够将多个块封装到一个转换块中。它返回一个

IPropagatorBlock<TInput, TOutput>

代表管道的开始和结束块。

但是,如果我的管道中的最后一个块是 ActionBlock,我不能使用它,因为 ActionBlock 不是 SourceBlock,并且函数的返回类型将是 ITargetBlock,而不是 IPropagatorBlock。

基本上,我正在寻找的是这样的功能:

public static ITargetBlock<TStart> Encapsulate<TStart, TEnd>(
        ITargetBlock<TStart> startBlock, 
        ActionBlock<TEnd> endBlock)

这是一个明智的写法,还是我错过了一些简单的东西?我不太确定to 是如何编写它的——尤其是连接完成。我需要创建自己的自定义块类型吗?

编辑:

好的,在阅读了@Panagiotis Kanavos 的回复并进行了一些修改后,我想出了这个。这是基于 EncapsulatingPropagator 类,这是现有 DataflowBlock.Encapsulate 方法使用的:

internal sealed class EncapsulatingTarget<TStart, TEnd> : ITargetBlock<TStart>
{
        private readonly ITargetBlock<TStart> startBlock;

        private readonly ActionBlock<TEnd> endBlock;

        public EncapsulatingTarget(ITargetBlock<TStart> startBlock, ActionBlock<TEnd> endBlock)
        {
            this.startBlock = startBlock;
            this.endBlock = endBlock;
        }

        public Task Completion
        {
            get { return this.endBlock.Completion; }
        }

        public void Complete()
        {
            this.startBlock.Complete();
        }

        void IDataflowBlock.Fault(Exception exception)
        {
            if (exception == null)
            {
                throw new ArgumentNullException("exception");
            }

            this.startBlock.Fault(exception);
        }

        public DataflowMessageStatus OfferMessage(
            DataflowMessageHeader messageHeader, 
            TStart messageValue, 
            ISourceBlock<TStart> source, 
            bool consumeToAccept)
        {
            return this.startBlock.OfferMessage(messageHeader, messageValue, source, consumeToAccept);
        }
    }

【问题讨论】:

  • 我刚刚发布了一个类似的更新!使这个通用化做得很好 - 这允许您将构建模块/片段与样板分开
  • 能够将“终止”管道的初始目标和完成视为单个类似 ActionBlock 的实体似乎是如此明显的需要,以至于 API 中的差距几乎感觉......有目的。但是我会吹口哨经过墓地并在这里使用你的工作。谢谢。

标签: c# task-parallel-library tpl-dataflow


【解决方案1】:

Encapsulate 不用于抽象现有的管道,它用于创建一个 propagator 块,该块需要使用现有块和链接无法实现的自定义行为。

例如,Sliding Window 示例缓冲所有发布到其输入块的传入消息,并在滑动窗口到期时将所有检索到的消息输出到其输出块。

方法的名称会造成很多混乱,但当您了解它们的目的时它们确实有意义:

  • target 参数是目标(输入)端点,preceding 块将连接到该端点以发送消息。在这种情况下,处理传入消息并决定是否发布到输出(源)块的 ActionBlock 是有意义的。
  • source 参数是成功 步骤将连接到的源(输出)端点以接收消息。将 ActionBlock 用作源没有意义,因为它没有任何输出。

接受 ActionBlock 方法为 sourceEncapsulate 变体没有用处,因为您可以简单地从任何先前步骤链接到操作块。

编辑

如果你想模块化一个管道,即将它分解成可重用、更易于管理的你可以创建一个构造类,你可以使用一个普通的旧类。在该类中,您可以正常构建管道片段,链接块(确保传播完成),然后将第一步和最后一步的 Completion 任务公开为公共属性,例如:

class MyFragment
{
    public TransformationBlock<SomeMessage,SomeOther> Input {get;}

    public Task Completion {get;}

    ActionBlock<SomeOther> _finalBlock;

    public MyFragment()
    {
        Input=new TransformationBlock<SomeMessage,SomeOther>(MyFunction);
        _finalBlock=new ActionBlock<SomeOther>(MyMethod);
        var linkOptions = new DataflowLinkOptions {PropagateCompletion = true}
        Input.LinkTo(_finalBlock,linkOptions);
    }

    private SomeOther MyFunction(SomeMessage msg)
    {
    ...
    }

    private void MyMethod(SomeOther msg)
    {
    ...
    }
}

要将片段连接到管道,您只需从管道块链接到暴露的Input 块。要等待完成,只需等待暴露的 Completion 任务。

如果你愿意,你可以在这里停下来,或者你可以实现ITargetBlock 使片段看起来像一个目标块。您只需要将所有方法委托给 Input 块,并将 Completion 属性委托给 final 块。

例如:

class MyFragment:ITargetBlock<SomeMessage> 
{
    ....

    public Task Completion {get;}

    public void Complete()
    {
        Input.Complete()
    };

    public void Fault(Exception exc)
    {
        Input.Fault(exc);
    }

    DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader,
        TInput messageValue,ISourceBlock<TInput> source,bool consumeToAccept)
    {
        return Input.OfferMessage(messageHeader,messageValue,source,consumeToAccept);
    }
}

编辑 2

使用@bornfromanegg 的第一类可以将构建片段的行为与暴露输入和完成的样板分开:

public ITargetBlock<SomeMessage> BuildMyFragment()
{
    var input=new TransformationBlock<SomeMessage,SomeOther>(MyFunction);
    var step2=new TransformationBlock<SomeOther,SomeFinal>(MyFunction2);
    var finalBlock=new ActionBlock<SomeFinal>(MyMethod);

    var linkOptions = new DataflowLinkOptions {PropagateCompletion = true}

    input.LinkTo(step2,linkOptions);
    step2.LinkTo(finalBlock,linkOptions);

    return new EncapsulatingTarget(input,finalBlock);
}

【讨论】:

  • 我同意一点。事实上,将 ActionBlock 用作源不仅没有意义 - 因为它没有实现 ISourceBlock,所以将其用作源也不可能。但是我还有一个问题要解决:我有一个管道,其中最后一个块是一个动作块,我想封装它。
  • Encapsulate 方法有一个非常具体(且有限)的目的——它不是一个通用的封装机制。当您需要在管道中包含具有明确定义的输入和输出端点的复杂图形或代码时,它很有用。这是实现自己的块类的捷径。在您的情况下,您只需要从管道的其余部分到您的第一个块的链接,并等待最后一个块的 Completion 任务 - 就像您使用任何管道一样。您甚至可以将块包含在一个类中,将输入块和 ActionBlock 的完成任务作为属性公开。
  • 我认为您在这里的真正意思是模块化,而不是封装管道。
  • 是的,这也许就是我的意思。我将编辑问题。
  • 精彩——这正是我想要的。谢谢。
【解决方案2】:

在我的例子中,我想封装一个包含多个最终 ActionBlocks 的网络,并带有一个总结完成,因此编辑问题中概述的解决方案不起作用。

因为与“最终块”的唯一交互围绕完成,所以只呈现封装的完成任务就足够了。 (根据建议添加了目标操作构造函数。)

public class EncapsulatingTarget<TInput> : ITargetBlock<TInput>
{
    private readonly ITargetBlock<TInput> startBlock;

    private readonly Task completion;

    public EncapsulatingTarget(ITargetBlock<TInput> startBlock, Task completion)
    {
        this.startBlock = startBlock;
        this.completion = completion;
    }

    public EncapsulatingTarget(ITargetBlock<TStart> startBlock, ActionBlock<TEnd> endBlock)
    {
        this.startBlock = startBlock;
        completion = endBlock.Completion;
    }

    public Task Completion => completion;

    public void Complete()
    {
        startBlock.Complete();
    }

    void IDataflowBlock.Fault(Exception exception)
    {
        if (exception == null)
        {
            throw new ArgumentNullException("exception");
        }

        startBlock.Fault(exception);
    }

    public DataflowMessageStatus OfferMessage(
        DataflowMessageHeader messageHeader,
        TInput messageValue,
        ISourceBlock<TInput> source,
        bool consumeToAccept)
    {
        return startBlock.OfferMessage(messageHeader, messageValue, source, consumeToAccept);
    }
}

示例用法:

public ITargetBlock<Client.InputRecord> BuildDefaultFinalActions()
{
    var splitter = new BroadcastBlock<Client.InputRecord>(null);
    var getresults = new TransformManyBlock(...);    // propagator
    var saveinput = new ActionBlock(...);
    var saveresults = new ActionBlock(...);

    splitter.LinkTo(saveinput, PropagateCompletion);
    splitter.LinkTo(getresults, PropagateCompletion);
    getresults.LinkTo(saveresults, PropagateCompletion);

    return new Util.EncapsulatedTarget<Client.InputRecord>(splitter, Task.WhenAll(saveinput.Completion, saveresults.Completion));
}

我本可以签名 EncapsulatingTarget&lt;T&gt;(ITargetBlock&lt;T&gt; target, params Task[] completions) 并将 WhenAll(...) 移动到构造函数中,但不想对所需的完成通知做出假设。

【讨论】:

  • 这看起来像是一个有用的补充。事实上,我看不出为什么你不能拥有both 那些构造函数,以及采用ActionBlock 的原始构造函数。这样你就可以让调用者选择使用哪个。
猜你喜欢
  • 2021-09-24
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-11-25
  • 1970-01-01
  • 1970-01-01
  • 2018-02-17
  • 1970-01-01
相关资源
最近更新 更多