【问题标题】:Why is RefCount not working after all initial subscribers disconnect? (redux)为什么在所有初始订阅者断开连接后 RefCount 不起作用? (还原)
【发布时间】:2016-03-03 02:29:23
【问题描述】:

应 Lee Campbell 的要求,这是this original 的后续问题。它旨在在我试图解决的用例的上下文中提出问题。

我有一个 WebApiService,它封装了一个原始 Web API 并提供令牌管理。也就是说,它会跟踪身份验证令牌,并将其传递给原始 API。这是WebApiService 中的一种公共方法的示例:

public IObservable<User> UpdateUserAsync(int id, UpdateUserRequest request) =>
    this
        .EnsureAuthenticatedAsync()
        .SelectMany(
            _ =>
                this
                    .rawWebApi
                    .UpdateUserAsync(this.TokenValue, id, request));

您可以在转发到原始 Web API 之前调用 EnsureAuthenticatedAsync,使用 this.TokenValue 传递令牌。

EnsureAuthenticatedAsync 方法如下所示:

public IObservable<Unit> EnsureAuthenticatedAsync() =>
    this
        .Token
        .FirstAsync()
        .SelectMany(token => string.IsNullOrWhiteSpace(token) ? this.authenticate : Observable.Return(Unit.Default));

我最初的问题是由我尝试编写身份验证管道(上面的this.authenticate)引起的。请注意,这是用单个 observable 替换整个 EnsureAuthenticatedAsync 方法的第一步。

对于authenticate,我想要一个可观察的:

  1. 在有人订阅之前什么都不做(冷/懒)
  2. 它只工作一次,即使一次有多个订阅者
  3. 如果所有订阅者都断开连接,它会再次工作

为此,我想出了这样的东西:

this.authenticate = Observable
    .Defer(() =>
        Observable
            .Create<Unit>(
                async (observer, cancellationToken) =>
                {
                    while (!cancellationToken.IsCancellationRequested)
                    {
                        var result = await this
                            .authenticationService
                            .AuthenticateAsync(this);

                        if (result.WasSuccessful)
                        {
                            observer.OnNext(Unit.Default);
                            observer.OnCompleted();

                            return;
                        }
                    }
                }))
    .Publish()
    .RefCount();

这里的想法是允许任意数量的同时调用WebApiService 方法以导致执行单个 身份验证循环。一旦通过身份验证,所有订阅者都将完成,任何未来的订阅者都意味着我们需要再次重新验证,从而重新执行延迟的 observable。

当然,上述 observable 与我最初问题中的简化问题存在相同的问题:一旦 deferred observable 完成一次,Publish 将立即完成任何未来的 observable(尽管重新请求了 deferred observable)。

如上所述,我的最终目标是用一个仅在令牌为空时执行此身份验证的管道完全替换 EnsureAuthenticatedAsync。但那是第二步,我失败了:)

所以回到最初的问题:有没有一种方法可以编写一个管道,无论当前订阅者的数量如何,它都会执行一次,但如果所有订阅者断开连接并再次重新连接,则会再次执行?

【问题讨论】:

    标签: c# .net system.reactive


    【解决方案1】:

    可观察序列不能多次完成。您想要在这里删除OnCompleted 调用,以便authenticate 不能完成多次,并将.Take(1) 添加到EnsureAuthenticatedAsync 以便订阅authenticate 将一个值后完成。

    下面是一个工作控制台应用程序。用obs 替换对obs1(具有Take)的引用以重现您的问题。在这两种情况下,您都可以快速按 Enter 以使所有四个订阅者都获得相同的值。

    class Program
    {
        static int value = 0;
    
        static void Main(string[] args)
        {
            var obs = Observable.Create<int>(observer =>
            {
                Console.WriteLine("Generating");
    
                Interlocked.Increment(ref value);
    
                return Observable.Return(value)
                    .Delay(TimeSpan.FromSeconds(1))
                    .Subscribe(observer);
            })
            .Publish() 
            .RefCount();
    
            var obs1 = obs.Take(1);
    
            obs1.Subscribe(
                i => Console.WriteLine("First {0}", i), 
                () => Console.WriteLine("First complete"));
            obs1.Subscribe(
                i => Console.WriteLine("Second {0}", i), 
                () => Console.WriteLine("Second complete"));
    
            Console.ReadLine();
    
            obs1.Subscribe(
                i => Console.WriteLine("Third {0}", i), 
                () => Console.WriteLine("Third complete"));
            obs1.Subscribe(
                i => Console.WriteLine("Fourth {0}", i), 
                () => Console.WriteLine("Fourth complete"));
    
            Console.WriteLine("Press enter to exit");
            Console.ReadLine();
        }
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2021-12-16
      • 2017-11-18
      • 2022-01-18
      • 2023-03-29
      • 1970-01-01
      • 2021-11-26
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多