【问题标题】:Is there a TPL dataflow block that doesn't take input but returns output?是否存在不接受输入但返回输出的 TPL 数据流块?
【发布时间】:2025-12-24 12:00:16
【问题描述】:

我的问题的标题说明了一切。

我正在寻找不需要输入的 TPL 数据流块。

现在我正在使用一个转换块,但它的输入未被使用。

【问题讨论】:

  • 没有。为什么你也需要它?也许我们可以创造出类似的东西。
  • 我的第一个块从队列中提取数据。从队列中提取数据不需要任何输入。
  • @BilalFazlani 实际上,它确实有一个输入。队列本身。 TransformManyBlock 可以接收队列作为输入并将其内容作为输出返回。或者你可以用一个 BufferBlock 替换你的队列(这也是一个队列)。或者你可以使用一个循环从队列中弹出消息并将它们发布到第一个块
  • 然后传递对队列的引用!在委托本身中,使用yield return 立即推送每条消息,例如queue=>{ while (!queue.Length==0){var msg=queue.Pop(); yield return msg;}}
  • @BilalFazlani 您HAVE 在这种情况下对其进行参数化,以便能够针对暂存环境进行测试。我有很多关于意外使用生产队列 URL 的测试的故事。在这种情况下,输入可以是 SQS URL(如果您使用 IoC 容器,则可以是客户端类)

标签: c# multithreading asynchronous task-parallel-library tpl-dataflow


【解决方案1】:

我将从BufferBlock<T> 构建一个这样的块:该方法接受一个代表该块的ITargetBlock<T> 端并返回它的ISourceBlock<T> 端的委托。这样,delegate 可以向 block 发送输入,但从外部看,它看起来像一个只产生输出的 block。

代码:

public static ISourceBlock<T> CreateProducerBlock<T>(
    Func<ITargetBlock<T>, Task> producer,
    int boundedCapacity = DataflowBlockOptions.Unbounded)
{
    var block = new BufferBlock<T>(
        new ExecutionDataflowBlockOptions { BoundedCapacity = boundedCapacity });

    Task.Run(async () =>
    {
        try
        {
            await producer(block);

            block.Complete();
        }
        catch (Exception ex)
        {
            ((IDataflowBlock)block).Fault(ex);
        }
    });

    return block;
}

示例用法:

var producer = CreateProducerBlock<int>(async target =>
{
    await target.SendAsync(10);
    await target.SendAsync(20);
});

ITargetBlock<int> consumer = …;

producer.LinkTo(consumer);

【讨论】:

  • 我一直在寻找 ISourceBlock 实现的示例,但这既简单又优雅!
【解决方案2】:

有时最简单的方法是使用一次性的 bool 作为 TransformManyBlock 的输入,并使用 .Post(true) 来启动您的管道。

【讨论】:

    最近更新 更多