【问题标题】:Data Propagation in TPL Dataflow Pipeline with Batchblock.Triggerbatch()使用 Batchblock.Triggerbatch() 在 TPL 数据流管道中进行数据传播
【发布时间】:2015-09-22 12:48:56
【问题描述】:

在我的生产者-消费者场景中,我有多个消费者,每个消费者都向外部硬件发送一个操作,这可能需要一些时间。我的管道看起来有点像这样:

BatchBlock --> TransformBlock --> BufferBlock --> (几个) ActionBlocks

我已将我的 ActionBlocks 的 BoundedCapacity 分配给 1。 理论上我想要的是,只有当我的一个 Actionblock 可用于操作时,我才想触发 Batchblock 将一组项目发送到 Transformblock。到那时,Batchblock 应该只保留缓冲元素,而不是将它们传递给 Transformblock。我的批量大小是可变的。由于 Batchsize 是强制性的,我确实对 BatchBlock 批量大小有一个非常高的上限,但是我真的不希望达到这个限制,我想根据执行上述任务的动作块的可用性来触发我的批次.

我在 Triggerbatch() 方法的帮助下实现了这一点。我将 Batchblock.Triggerbatch() 称为我的 ActionBlock 中的最后一个操作。然而有趣的是,经过几天的正常工作,管道已经大功告成。经过检查,我发现有时批处理块的输入是在 ActionBlock 完成其工作之后进入的。在这种情况下,ActionBlock 确实在其工作结束时调用了 Triggerbatch,但是由于此时 Batchblock 根本没有输入,所以对 TriggerBatch 的调用是没有结果的。一段时间后,当输入确实流入 Batchblock 时,就没有人可以调用 TriggerBatch 并重新启动 Pipeline。我一直在寻找可以检查 Batchblock 的输入缓冲区中是否确实存在某些内容的东西,但是没有这样的功能可用,我也找不到检查 TriggerBatch 是否有效的方法。

谁能建议我的问题的可能解决方案。不幸的是,使用 Timer 来触发批处理对我来说不是一个选择。除了流水线的开始,节流应该只受其中一个 ActionBlock 的可用性控制。

示例代码在这里:

    static BatchBlock<int> _groupReadTags;

    static void Main(string[] args)
    {
        _groupReadTags = new BatchBlock<int>(1000);

        var bufferOptions = new DataflowBlockOptions{BoundedCapacity = 2};
        BufferBlock<int> _frameBuffer = new BufferBlock<int>(bufferOptions);
        var consumerOptions = new ExecutionDataflowBlockOptions { BoundedCapacity = 1};
        int batchNo = 1;


        TransformBlock<int[], int> _workingBlock = new TransformBlock<int[], int>(list =>
        {

            Console.WriteLine("\n\nWorking on Batch Number {0}", batchNo);
            //_groupReadTags.TriggerBatch();
            int sum = 0;

            foreach (int item in list)
            {
                Console.WriteLine("Elements in batch {0} :: {1}", batchNo, item);
                sum += item;

            }
            batchNo++;
            return sum;

        });

            ActionBlock<int> _worker1 = new ActionBlock<int>(async x =>
            {
                Console.WriteLine("Number from ONE :{0}",x);
                await Task.Delay(500);

                    Console.WriteLine("BatchBlock Output Count : {0}", _groupReadTags.OutputCount);

                _groupReadTags.TriggerBatch();



        },consumerOptions);

        ActionBlock<int> _worker2 = new ActionBlock<int>(async x =>
        {
            Console.WriteLine("Number from TWO :{0}", x);
            await Task.Delay(2000);
            _groupReadTags.TriggerBatch();

        }, consumerOptions);

        _groupReadTags.LinkTo(_workingBlock);
        _workingBlock.LinkTo(_frameBuffer);
        _frameBuffer.LinkTo(_worker1);
        _frameBuffer.LinkTo(_worker2);

        _groupReadTags.Post(10);
        _groupReadTags.Post(20);
        _groupReadTags.TriggerBatch();

        Task postingTask = new Task(() => PostStuff());
        postingTask.Start();
        Console.ReadLine();

    }



    static void PostStuff()
    {


        for (int i = 0; i < 10; i++)
            {
                _groupReadTags.Post(i);
                Thread.Sleep(100);
            }

        Parallel.Invoke(
            () => _groupReadTags.Post(100),
            () => _groupReadTags.Post(200),
            () => _groupReadTags.Post(300),
            () => _groupReadTags.Post(400),
            () => _groupReadTags.Post(500),
            () => _groupReadTags.Post(600),
            () => _groupReadTags.Post(700),
            () => _groupReadTags.Post(800)
                       );
    }

【问题讨论】:

  • 限制是通过设置输入、输出链接选项的适当限制来实现的,例如通过设置DataflowLinkOptions.MaxMessages属性。但是,发布的代码甚至不会传播完成 - LinkTo(source,target) 重载不会传播完成。您需要使用接受链接选项和过滤谓词的the overload
  • @PanagiotisKanavos max messages 与节流无关。
  • @i3arnon 我只是指出应该使用数据流的机制来处理节流,而不是尝试从头开始模拟它们。从问题 why 设置边界不起作用并且代码没有帮助 - 完成没有传播并且唯一的边界等于 1
  • @PanagiotisKanavos 感谢您的评论。我真的不希望传播完成,因为这段代码实际上应该一直运行,如果我传播完成,我将不得不重新初始化我所有的 TPL 块,我不想这样做。另外,我没有过滤谓词,因为我不希望有条件地传播我的输出。这里的问题是另外一回事。我希望使用 TriggerBatch() 方法,一切都可以正常工作,除了我上面概述的情况,即在调用所有 TriggerBatch() 方法之后进入 Batchblock 的输入。
  • 重新表述了这个问题,以免造成任何混淆。

标签: c# pipeline throttling tpl-dataflow


【解决方案1】:

这是一个具有一些额外功能的替代BatchBlock 实现。它包含一个带有此签名的TriggerBatch 方法:

public int TriggerBatch(int nextMinBatchSizeIfEmpty);

如果输入队列不为空,调用此方法将立即触发批处理,否则将设置一个临时的MinBatchSize,仅影响下一个批处理。您可以使用较小的 nextMinBatchSizeIfEmpty 值调用此方法,以确保在当前无法生成批次的情况下,下一批将比块构造函数中配置的 BatchSize 更早发生。

此方法返回所生产的批次的大小。如果输入队列为空,或输出队列已满,或块已完成,则返回0

public class BatchBlockEx<T> : ITargetBlock<T>, ISourceBlock<T[]>
{
    private readonly ITargetBlock<T> _input;
    private readonly IPropagatorBlock<T[], T[]> _output;
    private readonly Queue<T> _queue;
    private readonly object _locker = new object();
    private int _nextMinBatchSize = Int32.MaxValue;

    public Task Completion { get; }
    public int InputCount { get { lock (_locker) return _queue.Count; } }
    public int OutputCount => ((BufferBlock<T[]>)_output).Count;
    public int BatchSize { get; }

    public BatchBlockEx(int batchSize, DataflowBlockOptions dataflowBlockOptions = null)
    {
        if (batchSize < 1) throw new ArgumentOutOfRangeException(nameof(batchSize));
        dataflowBlockOptions = dataflowBlockOptions ?? new DataflowBlockOptions();
        if (dataflowBlockOptions.BoundedCapacity != DataflowBlockOptions.Unbounded &&
            dataflowBlockOptions.BoundedCapacity < batchSize)
            throw new ArgumentOutOfRangeException(nameof(batchSize),
            "Number must be no greater than the value specified in BoundedCapacity.");

        this.BatchSize = batchSize;

        _output = new BufferBlock<T[]>(dataflowBlockOptions);

        _queue = new Queue<T>(batchSize);

        _input = new ActionBlock<T>(async item =>
        {
            T[] batch = null;
            lock (_locker)
            {
                _queue.Enqueue(item);
                if (_queue.Count == batchSize || _queue.Count >= _nextMinBatchSize)
                {
                    batch = _queue.ToArray(); _queue.Clear();
                    _nextMinBatchSize = Int32.MaxValue;
                }
            }
            if (batch != null) await _output.SendAsync(batch).ConfigureAwait(false);

        }, new ExecutionDataflowBlockOptions()
        {
            BoundedCapacity = 1,
            CancellationToken = dataflowBlockOptions.CancellationToken
        });

        var inputContinuation = _input.Completion.ContinueWith(async t =>
        {
            try
            {
                T[] batch = null;
                lock (_locker)
                {
                    if (_queue.Count > 0)
                    {
                        batch = _queue.ToArray(); _queue.Clear();
                    }
                }
                if (batch != null) await _output.SendAsync(batch).ConfigureAwait(false);
            }
            finally
            {
                if (t.IsFaulted)
                {
                    _output.Fault(t.Exception.InnerException);
                }
                else
                {
                    _output.Complete();
                }
            }
        }, TaskScheduler.Default).Unwrap();

        this.Completion = Task.WhenAll(inputContinuation, _output.Completion);
    }

    public void Complete() => _input.Complete();
    void IDataflowBlock.Fault(Exception ex) => _input.Fault(ex);

    public int TriggerBatch(Func<T[], bool> condition, int nextMinBatchSizeIfEmpty)
    {
        if (nextMinBatchSizeIfEmpty < 1)
            throw new ArgumentOutOfRangeException(nameof(nextMinBatchSizeIfEmpty));
        int count = 0;
        lock (_locker)
        {
            if (_queue.Count > 0)
            {
                T[] batch = _queue.ToArray();
                if (condition == null || condition(batch))
                {
                    bool accepted = _output.Post(batch);
                    if (accepted) { _queue.Clear(); count = batch.Length; }
                }
                _nextMinBatchSize = Int32.MaxValue;
            }
            else
            {
                _nextMinBatchSize = nextMinBatchSizeIfEmpty;
            }
        }
        return count;
    }

    public int TriggerBatch(Func<T[], bool> condition)
        => TriggerBatch(condition, Int32.MaxValue);

    public int TriggerBatch(int nextMinBatchSizeIfEmpty)
        => TriggerBatch(null, nextMinBatchSizeIfEmpty);

    public int TriggerBatch() => TriggerBatch(null, Int32.MaxValue);

    DataflowMessageStatus ITargetBlock<T>.OfferMessage(
        DataflowMessageHeader messageHeader, T messageValue,
        ISourceBlock<T> source, bool consumeToAccept)
    {
        return _input.OfferMessage(messageHeader, messageValue, source,
            consumeToAccept);
    }

    T[] ISourceBlock<T[]>.ConsumeMessage(DataflowMessageHeader messageHeader,
        ITargetBlock<T[]> target, out bool messageConsumed)
    {
        return _output.ConsumeMessage(messageHeader, target, out messageConsumed);
    }

    bool ISourceBlock<T[]>.ReserveMessage(DataflowMessageHeader messageHeader,
        ITargetBlock<T[]> target)
    {
        return _output.ReserveMessage(messageHeader, target);
    }

    void ISourceBlock<T[]>.ReleaseReservation(DataflowMessageHeader messageHeader,
        ITargetBlock<T[]> target)
    {
        _output.ReleaseReservation(messageHeader, target);
    }

    IDisposable ISourceBlock<T[]>.LinkTo(ITargetBlock<T[]> target,
        DataflowLinkOptions linkOptions)
    {
        return _output.LinkTo(target, linkOptions);
    }
}

TriggerBatch 方法的另一个重载允许检查当前可以生产的批次,并决定是否应该触发它:

public int TriggerBatch(Func<T[], bool> condition);

BatchBlockEx 类不支持内置BatchBlockGreedyMaxNumberOfGroups 选项。

【讨论】:

    【解决方案2】:

    我发现以这种方式使用TriggerBatch是不可靠的:

        _groupReadTags.Post(10);
        _groupReadTags.Post(20);
        _groupReadTags.TriggerBatch();
    

    显然TriggerBatch 打算在块内部使用,而不是像这样在块外部使用。我已经看到这会导致奇怪的计时问题,例如下一批批次中的项目被包含在当前批次中,即使首先调用了 TriggerBatch。

    请参阅我对这个问题的回答,以了解使用 DataflowBlock.Encapsulate 的替代方法:BatchBlock produces batch with elements sent after TriggerBatch()

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-02-21
      • 2016-04-18
      相关资源
      最近更新 更多