【问题标题】:TPL dataflow process N latest messagesTPL数据流处理N条最新消息
【发布时间】:2021-10-21 12:39:16
【问题描述】:

我正在尝试创建某种队列来处理收到的 N 条最新消息。现在我有这个:

private static void SetupMessaging()
{
    _messagingBroadcastBlock = new BroadcastBlock<string>(msg => msg, new ExecutionDataflowBlockOptions
    {
        //BoundedCapacity = 1,
        EnsureOrdered = true,
        MaxDegreeOfParallelism = 1,
        MaxMessagesPerTask = 1
    });

    _messagingActionBlock = new ActionBlock<string>(msg =>
    {
        Console.WriteLine(msg);
        Thread.Sleep(5000);
    }, new ExecutionDataflowBlockOptions
    {
        BoundedCapacity = 2,
        EnsureOrdered = true,
        MaxDegreeOfParallelism = 1,
        MaxMessagesPerTask = 1    
    });

    _messagingBroadcastBlock.LinkTo(_messagingActionBlock, new DataflowLinkOptions { PropagateCompletion = true });
    _messagingBroadcastBlock.LinkTo(DataflowBlock.NullTarget<string>());
}

问题是如果我将 1,2,3,4,5 发布到它,我会得到 1,2,5,但我希望它是 1,4,5。欢迎提出任何建议。
UPD 1
我能够使以下解决方案起作用

class FixedCapacityActionBlock<T>
{
    private readonly ActionBlock<CancellableMessage<T>> _actionBlock;

    private readonly ConcurrentQueue<CancellableMessage<T>> _inputCollection = new ConcurrentQueue<CancellableMessage<T>>();

    private readonly int _maxQueueSize;

    private readonly object _syncRoot = new object();

    public FixedCapacityActionBlock(Action<T> act, ExecutionDataflowBlockOptions opt)
    {
        var options = new ExecutionDataflowBlockOptions
        {
            EnsureOrdered = opt.EnsureOrdered,
            CancellationToken = opt.CancellationToken,
            MaxDegreeOfParallelism = opt.MaxDegreeOfParallelism,
            MaxMessagesPerTask = opt.MaxMessagesPerTask,
            NameFormat = opt.NameFormat,
            SingleProducerConstrained = opt.SingleProducerConstrained,
            TaskScheduler = opt.TaskScheduler,
            //we intentionally ignore this value
            //BoundedCapacity = opt.BoundedCapacity
        };
        _actionBlock = new ActionBlock<CancellableMessage<T>>(cmsg =>
        {
            if (cmsg.CancellationTokenSource.IsCancellationRequested)
            {
                return;
            }

            act(cmsg.Message);
        }, options);

        _maxQueueSize = opt.BoundedCapacity;
    }

    public bool Post(T msg)
    {
        var fullMsg = new CancellableMessage<T>(msg);

        //what if next task starts here?
        lock (_syncRoot)
        {
            _inputCollection.Enqueue(fullMsg);

            var itemsToDrop = _inputCollection.Skip(1).Except(_inputCollection.Skip(_inputCollection.Count - _maxQueueSize + 1));

            foreach (var item in itemsToDrop)
            {
                item.CancellationTokenSource.Cancel();
                CancellableMessage<T> temp;
                _inputCollection.TryDequeue(out temp);
            }

            return _actionBlock.Post(fullMsg);
        }
    }
}

class CancellableMessage<T> : IDisposable
{
    public CancellationTokenSource CancellationTokenSource { get; set; }

    public T Message { get; set; }

    public CancellableMessage(T msg)
    {
        CancellationTokenSource = new CancellationTokenSource();
        Message = msg;
    }

    public void Dispose()
    {
        CancellationTokenSource?.Dispose();
    }
}

虽然这可行并且实际上完成了这项工作,但这个实现看起来很脏,也可能不是线程安全的。

【问题讨论】:

  • 您能否更详细地解释一下这段代码的用途?

标签: c# .net tpl-dataflow


【解决方案1】:

这是一个 TransformBlockActionBlock 实现,只要收到较新的消息并达到 BoundedCapacity 限制,就会丢弃其队列中最旧的消息。它的行为与配置了BoundedChannelFullMode.DropOldestChannel 非常相似。

public static IPropagatorBlock<TInput, TOutput>
    CreateTransformBlockDropOldest<TInput, TOutput>(
    Func<TInput, Task<TOutput>> transform,
    ExecutionDataflowBlockOptions dataflowBlockOptions = null,
    IProgress<TInput> droppedMessages = null)
{
    if (transform == null) throw new ArgumentNullException(nameof(transform));
    dataflowBlockOptions = dataflowBlockOptions ?? new ExecutionDataflowBlockOptions();

    var boundedCapacity = dataflowBlockOptions.BoundedCapacity;
    var cancellationToken = dataflowBlockOptions.CancellationToken;

    var queue = new Queue<TInput>(Math.Max(0, boundedCapacity));

    var outputBlock = new BufferBlock<TOutput>(new DataflowBlockOptions()
    {
        BoundedCapacity = boundedCapacity,
        CancellationToken = cancellationToken
    });

    if (boundedCapacity != DataflowBlockOptions.Unbounded)
        dataflowBlockOptions.BoundedCapacity = checked(boundedCapacity * 2);
    // After testing, at least boundedCapacity + 1 is required.
    // Make it double to be sure that all non-dropped messages will be processed.
    var transformBlock = new ActionBlock<object>(async _ =>
    {
        TInput item;
        lock (queue)
        {
            if (queue.Count == 0) return;
            item = queue.Dequeue();
        }
        var result = await transform(item).ConfigureAwait(false);
        await outputBlock.SendAsync(result, cancellationToken).ConfigureAwait(false);
    }, dataflowBlockOptions);
    dataflowBlockOptions.BoundedCapacity = boundedCapacity; // Restore initial value

    var inputBlock = new ActionBlock<TInput>(item =>
    {
        var droppedEntry = (Exists: false, Item: (TInput)default);
        lock (queue)
        {
            transformBlock.Post(null);
            if (queue.Count == boundedCapacity) droppedEntry = (true, queue.Dequeue());
            queue.Enqueue(item);
        }
        if (droppedEntry.Exists) droppedMessages?.Report(droppedEntry.Item);
    }, new ExecutionDataflowBlockOptions()
    {
        CancellationToken = cancellationToken
    });

    PropagateCompletion(inputBlock, transformBlock);
    PropagateFailure(transformBlock, inputBlock);
    PropagateCompletion(transformBlock, outputBlock);
    _ = transformBlock.Completion.ContinueWith(_ => { lock (queue) queue.Clear(); },
        TaskScheduler.Default);

    return DataflowBlock.Encapsulate(inputBlock, outputBlock);

    async void PropagateCompletion(IDataflowBlock source, IDataflowBlock target)
    {
        try { await source.Completion.ConfigureAwait(false); } catch { }
        var exception = source.Completion.IsFaulted ? source.Completion.Exception : null;
        if (exception != null) target.Fault(exception); else target.Complete();
    }
    async void PropagateFailure(IDataflowBlock source, IDataflowBlock target)
    {
        try { await source.Completion.ConfigureAwait(false); } catch { }
        if (source.Completion.IsFaulted) target.Fault(source.Completion.Exception);
    }
}

// Overload with synchronous lambda
public static IPropagatorBlock<TInput, TOutput>
    CreateTransformBlockDropOldest<TInput, TOutput>(
    Func<TInput, TOutput> transform,
    ExecutionDataflowBlockOptions dataflowBlockOptions = null,
    IProgress<TInput> droppedMessages = null)
{
    return CreateTransformBlockDropOldest(item => Task.FromResult(transform(item)),
        dataflowBlockOptions, droppedMessages);
}

// ActionBlock equivalent
public static ITargetBlock<TInput>
    CreateActionBlockDropOldest<TInput>(
    Func<TInput, Task> action,
    ExecutionDataflowBlockOptions dataflowBlockOptions = null,
    IProgress<TInput> droppedMessages = null)
{
    if (action == null) throw new ArgumentNullException(nameof(action));
    var block = CreateTransformBlockDropOldest<TInput, object>(
        async item => { await action(item).ConfigureAwait(false); return null; },
        dataflowBlockOptions, droppedMessages);
    block.LinkTo(DataflowBlock.NullTarget<object>());
    return block;
}

// ActionBlock equivalent with synchronous lambda
public static ITargetBlock<TInput>
    CreateActionBlockDropOldest<TInput>(
    Action<TInput> action,
    ExecutionDataflowBlockOptions dataflowBlockOptions = null,
    IProgress<TInput> droppedMessages = null)
{
    return CreateActionBlockDropOldest(
        item => { action(item); return Task.CompletedTask; },
        dataflowBlockOptions, droppedMessages);
}

想法是将排队的项目存储在辅助Queue 中,并将虚拟(空)值传递给内部ActionBlock&lt;object&gt;。该块忽略作为参数传递的项目,并从队列中取一个项目,如果有的话。 Αlock 用于确保队列中所有未丢弃的项目最终都会被处理(当然除非发生异常)。

还有一个额外的功能。可选的IProgress&lt;TInput&gt; droppedMessages 参数允许在每次删除消息时接收通知。

使用示例:

_messagingActionBlock = CreateActionBlockDropOldest<string>(msg =>
{
    Console.WriteLine($"Processing: {msg}");
    Thread.Sleep(5000);
}, new ExecutionDataflowBlockOptions
{
    BoundedCapacity = 2,
}, new Progress<string>(msg =>
{
    Console.WriteLine($"Message dropped: {msg}");
}));

【讨论】:

    【解决方案2】:

    TPL Dataflow 不适合 Last N messages,因为它是队列或管道 (FIFO),而不是堆栈 (LIFO)。您真的需要使用数据流库来执行此操作吗?

    使用ConcurrentStack&lt;T&gt; 更容易,您只需引入一个生产者任务,它会发布到堆栈,以及一个消费者任务,它从堆栈获取消息,而处理的数量少于N (More about Producer-Consumer) .

    如果您需要TPL Dataflow,您可以在消费者任务中使用它来开始处理最后的消息,但不能在生产者中使用,因为它确实不是它的本意使用方式。此外,还有其他一些基于事件架构的库,它们可能更适合您的问题。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多