【问题标题】:Notify an observable to complete when request has finished请求完成时通知可观察对象完成
【发布时间】:2021-05-01 15:55:09
【问题描述】:

我对响应式编程还比较陌生,即使在阅读了 Rx.Net in Action 之后,我仍然在努力理解它。

我有一个服务类,它有两个公共方法 GetGetStream

GetStream 将创建一个 observable,它将连接 storeupdate observable,因此消费者可以毫无问题地接收未来的更新。

update observable 每分钟执行一次来自providers 的符号更新 - 轮询 api 请求

Get 将创建一个订阅来自store 的项目的可观察对象,并与正在更新的update 可观察对象if 连接。问题是一旦更新完成,没有OnComplete 被调用并且消费者无限期挂起。我将这个 observable 用作地方的阻塞调用和其他地方的非阻塞调用。

我想我可以通过ResetEvent 侥幸逃脱,但这似乎不是一种非常被动的处理方式。有更好的反应替代方案吗?也许我正试图在一个应该只返回IEnumerable 的地方拟合一个可观察的模式?

目标是让 observable 每隔指定的时间间隔更新存储。有一种方法可以获取商店的当前状态,包括当前更新。有另一种方法来获取商店的状态和任何未来的更新。

这是我目前需要的代码的 sn-p。

public class SymbolService : ISymbolService, IDisposable
{
    private readonly ILogger _logger = Logger.Create(nameof(SymbolService));
    
    private readonly IObservable<(string Provider, Symbol Symbol)> _update;

    private readonly ISymbolStore _store;

    private readonly IDisposable _token;

    private bool _updating = true;

    public SymbolService(SymbolOptions options, ISymbolStore store,
        IEnumerable<ISymbolProvider> providers)
    {
        _store = store;

        // Request and aggregate symbols from providers and put
        // into store.
        var source = providers
            .Select(provider => provider
                .GetSymbols()
                .Select(symbol => (provider.Name, symbol))
                .Take(10))
            .Merge()
            .Do(entry =>
                {
                    _updating = true;
                    _store.Put(entry);
                },
                () =>
                    _updating = false)
            .Log(_logger, "Symbol");

        // Perform update of symbols every interval
        var update = Observable
            .Timer(TimeSpan.Zero, options.RefreshInterval)
            .SelectMany(_ => source)
            .Publish();

        _update = update;

        _token = update
            .Connect();
    }

    /// <summary>
    /// Gets all symbols for a given provider.
    /// </summary>
    /// <param name="provider">The specified provider.</param>
    /// <returns>The observable stream.</returns>
    public IObservable<Symbol> Get(string provider)
    {
        // Get symbols from store by provider and concat stream
        // if update is being performed
        return _store
            .Get(provider)
            .Concat(Observable
                .If(() => _updating, _update
                    .TakeWhile(_ => _updating)
                    .Where(x => x.Provider.Equals(provider))
                    .Select(x => x.Symbol)));
    }

    /// <summary>
    /// Gets all symbols and future symbols for a given provider.
    /// </summary>
    /// <param name="provider">The specified provider.</param>
    /// <returns>The observable stream.</returns>
    public IObservable<Symbol> GetStream(string provider)
    {
        // Get symbols from store by provider and concat stream
        // of all future symbols
        return _store
            .Get(provider)
            .Concat(_update
                .Where(x => x.Provider.Equals(provider))
                .Select(x => x.Symbol))
            .Distinct();
    }

    public void Dispose()
    {
        _token?.Dispose();
    }
}

非常感谢您对此的任何帮助。

【问题讨论】:

    标签: c# functional-programming observable reactive


    【解决方案1】:

    再次浏览本书后,我认为BehaviourSubject 非常适合该解决方案。我只包含了回答我问题的相关代码 sn-p。

    订阅BehaviourSubject,当update 完成处理完成observable 并释放令牌时会收到通知。

        private readonly BehaviorSubject<bool> _updating;
    
        public SymbolService(SymbolOptions options, ISymbolStore store,
            IEnumerable<ISymbolProvider> providers)
        {
            _store = store;
    
            _updating = new BehaviorSubject<bool>(true);
    
            // Request and aggregate symbols from providers and put
            // into store.
            var source = providers
                .Select(provider => provider
                    .GetSymbols()
                    .Select(symbol => (provider.Name, symbol)))
                .Merge();
    
            // Perform update of symbols every interval
            var update = Observable
                .Timer(TimeSpan.Zero, options.RefreshInterval)
                .Log(_logger, "Updating", LogEventLevel.Information)
                .Do(_ => _updating.OnNext(true))
                .SelectMany(_ =>
                {
                    return source
                        .GetSymbols()
                        .Do(_store.Put)
                        .Finally(() => _updating.OnNext(false));
                })
                .Publish();
    
            _update = update;
    
            _token = update
                .Connect();
        }
    
        public IObservable<Symbol> Get(string provider)
        {
            // Get symbols from store by provider and concat stream
            // if update is being performed
            return _store
                .Get(provider)
                .Concat(_update
                    .Where(x => x.Provider.Equals(provider))
                    .Select(x => x.Symbol))
                .TakeUntil(_updating.Where(x => x == false));
        }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-01-03
      • 1970-01-01
      相关资源
      最近更新 更多