【问题标题】:Synchronize Data with Async functions使用异步函数同步数据
【发布时间】:2016-08-20 04:35:22
【问题描述】:

我有一个与一些外部硬件的异步接口,它允许读取和写入值。

假设它看起来像这样:

interface IAsyncConnection
{
    IReadOnlyDictionary<string,object> ReadAsync(IReadOnlyList<string> keysToRead)
    void WriteAsync(IReadOnlyDictionary<string,object> values) 
}

硬件和协议本身是线程安全的,所以如果我同时调用ReadAsyncWriteAsync,这两种方法中的一种获得锁并首先执行,而另一种方法只需要更长的时间。

由于协议不支持更改通知,我实现了某种轮询循环:

IReadOnlyDictionary<string,object> oldValues = null;
while(true)
{
    Task minimumTime = Task.Delay(100);
    var newValues = await this.connection.ReadAsync(valuesToRead);
    var changedValues = this.GetChangedValues(newVales, oldValues)

    this.Update(changedValues); //Actually delegates ViewModel changes and updates the UI
    await minimumTime; //just some sample
}

现在,如果我在轮询循环位于 GetChangedValues 内的那一刻调用 write,我会遇到麻烦,因为使用旧值调用 Update 方法,这确实在写入之前将 UI 设置回旧值(这会导致进一步的问题撤消/重做堆栈等)

现在,如果读取是同步的,我只需使用可以锁定的SyncRoot 属性扩展连接。并在写入和更新的对象之间创建一些同步,以正确获取当前更改的值。

如何使用异步方法实现类似的功能?

编辑: 澄清一下,SemaphoreSlim SyncRoot 不支持重入(它应该如何,ofc),这意味着如果我在IAsyncConnection 内创建某种锁定并通过属性使其在外部可用,则我无法在 @ 中调用 WaitAsync 987654330@ 类和IAsyncConnection 内,因为这将重新进入SemaphoreSlim。我当然可以通过 Connection 的 WriteAsync 方法获得SemaphoreSlimPollingLoop 获得SemaphoreSlim 的方式来实现它。但这似乎是某种神奇的(不可维护的)同步,因为真正的 Connection 有大约 12 个不同时运行的方法。

【问题讨论】:

  • 我的建议是发布更多代码并更清楚地解释锁定方案。我的预感是您需要从概念上清理锁定方案。没有 API 修复可能,这就是我删除答案的原因。作为一般规则的可重入性指向糟糕的设计。
  • 你能解释一下为什么重入是不好的设计吗?
  • blog.stephencleary.com/2013/04/recursive-re-entrant-locks.html 这不是硬性规定,但正如您所见,它往往会导致问题。
  • Strange 从来没有遇到过任何问题,但是我会尝试将设计更改为非递归锁,看看是否能顺利进行..

标签: c# synchronization locking semaphore


【解决方案1】:

我认为,您需要的是两个独立的任务 - 一个以 100 毫秒的间隔轮询硬件(协议),将数据(如果有)推送到 BlockingCollection,第二个任务是在新数据时从 BlockingCollection 读取数据真的到了。

然后,计算从 BlockingCollection 读取的秒任务中的更改值。

BlockingCollection 的优点是您可以执行以下操作:

_queue = new BlockingCollection<KeyValuePair<string,object>>();
// The poll task does pure protocol-polling
// I assume that a read with no hardware data returns empty
// IEnumerable of KeyValuePair (immediately)
// You can have it async but thats because of the IO-operation, not
// Because you're waiting for data. The polltask does what you want
_pollTask = Task.Run(() =>
{
    while(!token.IsCancellationRequested)
    {
        var newValues = this.connection.Read(valuesToRead);
        foreach(var newValue in newValues)
        {
            // Next, I try to add new data and wait max 1 second
            // if the queue is full to wait for the second task to read
            if(!_queue.TryAdd(newValue, 1000, token))
            {
                 if(!token.IsCancellationRequested)
                 {
                     // Log that queue was full
                 }
            }
        }
        token.WaitHandle.WaitOne(100); // Sleeps 100ms    
    }
}

//
// The read task blocks forever (at TryTake) if no data exists (has been written
// by the poll-task), and executes check for changed values and updates
// the gui if, and only if, new data actually exists
_readTask = Task.Run(() =>
{
    while(!token.IsCancellationRequested)
    {
        KeyValuePair<string, object> nextItem;
        if(_queue.TryTake(out nextItem, Timeout.Infinite, token))
        {
            if(!token.IsCancellationRequested && this.ValueChanged(nextItem, oldValues) 
            {
                 this.Update(nextItem);
            }
        }
    }
}

【讨论】:

    猜你喜欢
    • 2012-07-25
    • 2013-12-25
    • 1970-01-01
    • 2013-07-12
    • 2020-04-20
    • 1970-01-01
    • 2019-03-24
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多