【问题标题】:Rx: Wait for first item for a period of timeRx:等待第一个项目一段时间
【发布时间】:2017-10-02 11:11:43
【问题描述】:

我想将我遗留的基于事件的方法转换为基于可观察的方法,但我对 Rx 很陌生,所以我现在卡住了。

我有一个事件源,现在是可观察的。在某个时间点,我必须启动一个方法,该方法通过返回该行中的下一个元素或如果超时则返回 null。

基于事件的方法如下所示:

public async Task<ReaderEvent> WaitForReaderAsync(int PlaceId, TimeSpan waitFor)
{
    ReaderEvent result = null;
    using (var cts = CancellationTokenSource.CreateLinkedTokenSource(new [] { topLevelToken }))
    {
        cts.CancelAfter(waitFor);

        EventHandler<ReaderEvent> localHandler = (o, e) =>
        {
            if (e.PlaceId == PlaceId)
            {
                result = e;
                cts.Cancel();
            }
        };

        ReaderEventHandler += localHandler;
        try
        {
            await Task.Delay(waitFor, cts.Token).ConfigureAwait(false);
        }
        catch (OperationCanceledException) { }
        catch (Exception ex)
        {
            //...
        }

        ReaderEventHandler -= localHandler;
    }

    return result;
}

如您所见,这个想法是延迟被我等待的事件的到来取消,或者令牌源在特定时间后被配置取消。很干净。

现在,Rx 版本:

public async Task<ReaderEvent> WaitForReaderAsync(int PlaceId, TimeSpan waitFor)
{
    ReaderEvent result = null;

    var observable = _OnReaderEvent.FirstAsync(r => r.PlaceId == PlaceId);

    using (var cts = CancellationTokenSource.CreateLinkedTokenSource(new [] { topLevelToken }))
    {
        cts.CancelAfter(waitFor);
        using (observable.Subscribe(x => {
            result = x;
            cts.Cancel();
        {
            try
            {
                await Task.Delay(waitFor, cts.Token).ConfigureAwait(false);
            }
            catch (OperationCanceledException) { }
        }
    }
    return result;
}

不是那么干净……更糟糕的是…… 我也尝试过超时扩展。但由于这是一次性订阅,我仍然需要在处理订阅之前以某种方式等待。唯一的区别是 OnError 会取消本地令牌,而不是 CancelAfter 的内置机制。

是否有任何击球手/更简洁(更依赖 Rx)的方式来做到这一点?

谢谢!

【问题讨论】:

    标签: c# task-parallel-library system.reactive


    【解决方案1】:

    你可以试试:

    var values = await _OnReaderEvent
      .Where(r => r.PlaceId == placeId)
      .Buffer(waitFor, 1)
      .FirstAsync(); // get list of matching elements during waitFor time
    
    return values.FirstOrDefault(); // return first element or null if the list is empty
    

    【讨论】:

    • 好吧,在这种情况下,r 看起来是 IList,但反过来看起来很有希望:var values = await _OnReaderEvent.FirstAsync(r => r.PlaceId == PlaceId)。缓冲区(等待);返回值。FirstOrDefault();尽管如此, Buffer() 似乎还是阻塞了。如果行中没有事件,它不会终止:(
    • 缓冲区返回 List,其中 T 是 Observable 项的类型。它是在 waitFor 时间窗口内生成的所有项目的列表,并且它应该在每个 waitFor 时返回空列表。如果您执行 Buffer -> FirstAsync 那么您将获得第一个列表。见msdn.microsoft.com/en-us/library/hh229813(v=vs.103).aspx
    • 啊……明白了。那么这就是我需要的: var values = await _OnReaderEvent.Where(r => r.PlaceId == PlaceId).Buffer(waitFor, 1).FirstAsync();返回值。FirstOrDefault();你为我指明了正确的方向!
    • 几乎 :) 您错过了 1 作为 Buffer 参数,这对于在第一个元素到达时立即返回很重要。不幸的是,没有一个方法接受取消令牌。
    • 您可以使用取消令牌包装所有内容,并在取消时处理订阅。
    【解决方案2】:

    为什么不直接使用简单的 Rx 版本的代码:

    public async Task<ReaderEvent> WaitForReaderAsync(int PlaceId, TimeSpan waitFor)
    {
        return await
            _OnReaderEvent
                .Where(r => r.PlaceId == PlaceId)
                .Buffer(waitFor, 1)
                .Select(xs => xs.FirstOrDefault())
                .FirstOrDefaultAsync()
                .ToTask();
    }
    

    【讨论】:

    • 它不会以这种方式终止。但是在 .ToTask() 之前添加一个 .FirstOrDefaultAsync() 使其工作。是的,这是最干净的。我可以无缝地添加我的取消令牌。谢谢!
    • @ZorgoZ - 取消令牌代码没有用,因为您实际上并没有取消底层的 observable。有点无意义。
    【解决方案3】:

    这个问题可以通过很多不同的方式来解决。这是一个,利用AmbReturnDelayFirstAsync 运算符:

    public Task<ReaderEvent> WaitForReaderAsync(int PlaceId, TimeSpan waitFor)
    {
        return _OnReaderEvent
            .Where(r => r.PlaceId == PlaceId)
            .Amb(Observable.Return(default(ReaderEvent)).Delay(waitFor))
            .FirstAsync()
            .ToTask();
    }
    

    _OnReaderEvent observable 完成或在等待期间完成的异常情况下,生成的Task 将转换为故障状态,但InvalidOperationException“序列不包含元素”除外。

    另一个实现,功能等同于前一个,使用Timeout 运算符:

    public Task<ReaderEvent> WaitForReaderAsync(int PlaceId, TimeSpan waitFor)
    {
        return _OnReaderEvent
            .Where(r => r.PlaceId == PlaceId)
            .FirstAsync()
            .Timeout(waitFor, Observable.Return(default(ReaderEvent)))
            .ToTask();
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-06-21
      • 1970-01-01
      • 2021-08-26
      • 2014-12-29
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多