【问题标题】:Producer/Consumer with BufferBlock, how to awake periodically, even if there is no product in the buffer?带BufferBlock的Producer/Consumer,即使buffer中没有product,如何周期性唤醒?
【发布时间】:2021-04-25 19:03:41
【问题描述】:

上下文

在我的 .NET 5 控制台应用程序中,我的使用者作为线程(任务)运行。目前只有一个,但未来可能会在同一个 BufferBlock 实例上同时拥有赛车消费者。

当前的实现运行良好。 我坚持实现以下附加功能:

我想在一个配置的时间段内醒来,即使缓冲区是空的,仍然留在主循环中。 此用例必须与令牌的 IsCancellationRequested 唤醒区分开来, 但可选地将其与“产品可供消费”区分开来并不是强制性的。

我确实看到ReceiveAsync 有超时,但不清楚OutputAvailableAsync 是如何参与的,它不接受超时。

问题

如何实现在给定时间段内唤醒,并保持在循环中。只有在 IsCancellationRequested 应该打破循环的情况下

public class MyConsumer
{
    private readonly BufferBlock<MyProduct> _products;

    public void Start(CancellationToken token)
    {
        Task.Factory.StartNew(() => Run(token), token);
    }

    private async Task Run(CancellationToken token)
    {
        await ConsumeAsync(token);
    }

    private async Task ConsumeAsync(CancellationToken token)
    {
        while (await _products.OutputAvailableAsync(token))
        {
            var product = await _products.ReceiveAsync(token);
            // Consume product goes here:...

            // I would like wake up here in a configured period, even the buffer is empty.
            //
            // How to implement this timeout based wake up? (then still remain in the loop)
            // I do not even understand clearly why are we using the two waiting operations, 
            // the 1) OutputAvailableAsync(token) then 2) ReceiveAsync(token)
        }
    }
}

【问题讨论】:

  • 关于我什至不明白为什么我们使用两个等待操作 -> 你可以用while(true) 替换await _products.OutputAvailableAsync(token) 因为ReceiveAsync 会等到输出是可用的,但是您需要检查该块是否已完成,因此您自己不会在未来产生任何东西。 OutputAvailableAsync 在没有更多输出可用时更容易退出。
  • 我想在配置的时间里在这里醒来,即使缓冲区是空的。我不明白用例。如果缓冲区为空,则没有可用的产品。您不能创建一个单独的方法来做任何您想做的事情并在所需的时间调用该方法吗?

标签: .net .net-core async-await task-parallel-library producer-consumer


【解决方案1】:

我对@9​​87654321@ 不是很熟悉,但在一般意义上:如果async API 不提供超时,您可以通过取消令牌模拟相同超时 - 仍然尊重现有令牌:

using var cts = new CancellationTokenSource();
// link the existing CancellationToken so that *it* can propagate cancellation
using var linked = token.Register(
    static s => ((CancellationTokenSource)s).Cancel(), cts, false);
// add a timeout
cts.CancelAfter(yourTimeoutHere);
// use this new combined token to do the magic
await DoSomethingAsync(cts.Token);

(如果您使用的不是 C# 9 或更高版本,请删除 static;这只是验证我们没有由于回调中捕获的变量而导致额外的分配)


一旦你有了它,你可以简单地响应你的超时作为增量工作,捕捉OperationCanceledException,然后做你需要的事情。需要注意的是,如果发生取消,请了解DoSomethingAsync 中留下的状态。这将是特定场景的。

【讨论】:

  • 我不认为 如果异步 API 不提供超时 是这里的情况。它确实支持超时。看起来实际的问题是如何在方法等待缓冲区块的输出时输入方法。
  • @PeterBons 也许我不够明确;我会添加一个段落...
  • 谢谢,我认为这大大改善了答案。
  • @PeterBons .OutputAvailableAsync(token) 不提供超时。控制流在那里异步等待,直到发生取消或缓冲区中有可用项。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2019-08-06
  • 1970-01-01
  • 1970-01-01
  • 2021-11-29
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多