【问题标题】:TPL Dataflow block which delays the forward of the message to the next block延迟消息转发到下一个块的 TPL 数据流块
【发布时间】:2014-07-09 13:12:34
【问题描述】:

我需要一个 Dataflow 块,它根据消息中的时间戳 (LogEntry) 将消息转发到下一个块。

这是我想出的,但感觉不对。有什么改进建议吗?

  private IPropagatorBlock<LogEntry, LogEntry> DelayedForwardBlock()
    {
        var buffer = new ConcurrentQueue<LogEntry>();

        var source = new BufferBlock<LogEntry>();

        var target = new ActionBlock<LogEntry>(item =>
        {
            buffer.Enqueue(item);
        });


        Task.Run(() =>
            {
                LogEntry entry;
                while (true)
                {
                    entry = null;
                    if (buffer.TryPeek(out entry))
                    {
                        if (entry.UtcTimestamp < (DateTime.UtcNow - TimeSpan.FromMinutes(5)))
                        {
                            buffer.TryDequeue(out entry);
                            source.Post(entry);
                        }
                    }
                }
            });


        target.Completion.ContinueWith(delegate
        {
            LogEntry entry;
            while (buffer.TryDequeue(out entry))
            {
                source.Post(entry);
            }

            source.Complete();
        });

        return DataflowBlock.Encapsulate(target, source);
    }

【问题讨论】:

标签: c# .net task-parallel-library tpl-dataflow


【解决方案1】:

您可以简单地使用单个 TransformBlock,使用 Task.Delay 异步等待延迟:

IPropagatorBlock<TItem, TItem> DelayedForwardBlock<TItem>(TimeSpan delay)
{
    return new TransformBlock<TItem, TItem>(async item =>
    {
        await Task.Delay(delay);
        return item;
    });
}

用法:

var block = DelayedForwardBlock<LogEntry>(TimeSpan.FromMinutes(5));

【讨论】:

  • 非常感谢。有时它可以如此简单:-)
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-01-30
相关资源
最近更新 更多