【问题标题】:Task-based idle detection基于任务的空闲检测
【发布时间】:2017-03-28 03:45:38
【问题描述】:

想要限制某些事件之间的间隔并在超出限制时采取措施并不罕见。例如,用于检测另一端是否处于活动状态的网络对等方之间的心跳消息。

在 C# async/await 风格中,可以通过在每次心跳到达时替换超时任务来实现:

var client = new TcpClient { ... };
await client.ConnectAsync(...);

Task heartbeatLost = new Task.Delay(HEARTBEAT_LOST_THRESHOLD);
while (...)
{
    Task<int> readTask = client.ReadAsync(buffer, 0, buffer.Length);
    Task first = await Task.WhenAny(heartbeatLost, readTask);
    if (first == readTask) {
        if (ProcessData(buffer, 0, readTask.Result).HeartbeatFound) {
            heartbeatLost = new Task.Delay(HEARTBEAT_LOST_THRESHOLD);
        }
    }
    else if (first == heartbeatLost) {
        TellUserPeerIsDown();
        break;
    }
}

这很方便,但是延迟Task的每个实例都拥有一个Timer,如果有很多心跳包在小于阈值的时间内到达,那就是很多Timer对象在加载线程池。此外,每个Timer 的完成都会在线程池上运行代码,无论是否有任何继续链接到它。

你不能通过调用heartbeatLost.Dispose()来释放旧的Timer;这将给出一个例外

InvalidOperationException: 只有处于完成状态的任务才能被释放

可以创建CancellationTokenSource 并使用它来取消旧的延迟任务,但是当计时器本身具有可重新调度的特性时,创建更多对象来完成此任务似乎不是最佳选择。

集成计时器重新调度的最佳方法是什么,以便代码可以更像这样的结构?

var client = new TcpClient { ... };
await client.ConnectAsync(...);

var idleTimeout = new TaskDelayedCompletionSource(HEARTBEAT_LOST_THRESHOLD);
Task heartbeatLost = idleTimeout.Task;
while (...)
{
    Task<int> readTask = client.ReadAsync(buffer, 0, buffer.Length);
    Task first = await Task.WhenAny(heartbeatLost, readTask);
    if (first == readTask) {
        if (ProcessData(buffer, 0, readTask.Result).HeartbeatFound) {
            idleTimeout.ResetDelay(HEARTBEAT_LOST_THRESHOLD);
        }
    }
    else if (first == heartbeatLost) {
        TellUserPeerIsDown();
        break;
    }
}

【问题讨论】:

  • 如果在读取过程中心跳失败,读取是否也会失败?如果您确实需要手动心跳,这可能与您的阅读无关,例如在另一个可以向您的读取操作发送取消请求的健康监控循环中?
  • 在第二次读取所需版本的代码时,“心跳”用于超时读取。不能为每次读取创建一个新的超时任务,或者直接在连接上设置超时吗?
  • 您愿意将Task heartbeatLost = idleTimeout.Task; 移动到while (...) 循环内吗?在循环之外,如果没有竞争条件,问题就很难解决。
  • @ScottChamberlain:是的,我假设延迟在触发后无法重置,但如果存在竞争条件,那么再次获取任务是完全可以的。
  • 竞争条件是如果heartbeatLostWhenAny 调用之后但在ResetDelay 之前转换到Completed 状态,那么你就搞砸了,因为你无法将任务移出 Completed一旦你进入它的状态。您必须为每个循环生成新的任务对象。

标签: c# timer async-await


【解决方案1】:

对我来说似乎很简单,你假设的班级的名字让你大部分时间都在那里。您只需要一个TaskCompletionSource 和一个不断重置的计时器。

public class TaskDelayedCompletionSource
{
    private TaskCompletionSource<bool> _completionSource;
    private readonly System.Threading.Timer _timer;
    private readonly object _lockObject = new object();

    public TaskDelayedCompletionSource(int interval)
    {
        _completionSource = CreateCompletionSource();
        _timer = new Timer(OnTimerCallback);
        _timer.Change(interval, Timeout.Infinite);
    }

    private static TaskCompletionSource<bool> CreateCompletionSource()
    {
        return new TaskCompletionSource<bool>(TaskCreationOptions.DenyChildAttach | TaskCreationOptions.RunContinuationsAsynchronously | TaskCreationOptions.HideScheduler);
    }

    private void OnTimerCallback(object state)
    {
        //Cache a copy of the completion source before we entier the lock, so we don't complete the wrong source if ResetDelay is in the middle of being called.
        var completionSource = _completionSource;
        lock (_lockObject)
        {
            completionSource.TrySetResult(true);
        }
    }

    public void ResetDelay(int interval)
    {
        lock (_lockObject)
        {
            var oldSource = _completionSource;
            _timer.Change(interval, Timeout.Infinite);
            _completionSource = CreateCompletionSource();
            oldSource.TrySetCanceled();
        }
    }
    public Task Task => _completionSource.Task;
}

这只会创建一个计时器并更新它,当计时器触发时任务完成。

您需要稍微更改代码,因为每次更新结束时间时都会创建一个新的 TaskCompletionSource,您需要将 Task heartbeatLost = idleTimeout.Task; 调用放入 while 循环中。

var client = new TcpClient { ... };
await client.ConnectAsync(...);

var idleTimeout = new TaskDelayedCompletionSource(HEARTBEAT_LOST_THRESHOLD);
while (...)
{
    Task heartbeatLost = idleTimeout.Task;
    Task<int> readTask = client.ReadAsync(buffer, 0, buffer.Length);
    Task first = await Task.WhenAny(heartbeatLost, readTask);
    if (first == readTask) {
        if (ProcessData(buffer, 0, readTask.Result).HeartbeatFound) {
            idleTimeout.ResetDelay(HEARTBEAT_LOST_THRESHOLD);
        }
    }
    else if (first == heartbeatLost) {
        TellUserPeerIsDown();
    }
}

编辑:如果您关心完成源的对象创建(例如,您在游戏引擎中编程,其中 GC 收集是一个很大的问题),您可以添加额外的逻辑到OnTimerCallbackResetDelay 以在调用尚未发生并且您确定自己不在重置延迟范围内时重用完成源。

您可能需要从使用 lock 切换到 SemaphoreSlim 并将回调更改为

    private void OnTimerCallback(object state)
    {
        if(_semaphore.Wait(0))
        {
            _completionSource.TrySetResult(true);
        }
    }

稍后我可能会更新此答案以包含 OnTimerCallback 也会有的内容,但我现在没有时间。

【讨论】:

  • 我刚刚编辑了问题中的用例以在超时发生时执行break;,以明确在此之后计时器发生什么并不重要。
  • 顺便说一句,结果很好而且很简单。在看到 BCL 版本有多么复杂,使用 TimerPromise 辅助类等等之后,我有点犹豫是否要重新创建“围绕计时器的任务”。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2017-12-06
  • 2011-03-06
  • 1970-01-01
  • 2017-12-14
  • 2012-09-20
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多