【问题标题】:Replay subject subscription behaviour重播主题订阅行为
【发布时间】:2023-03-22 15:27:01
【问题描述】:

以下代码按预期工作,但我对取消注释行 'o.OnCompleted();' 时的行为方式感到困惑

代码将所有订阅者连接到单个长操作的结果,并将结果缓存 2 秒以供其他订阅者使用。在此时间之后的任何订阅都会重新开始该过程。

订阅将来自其他线程(用线程池模拟)。

        var obs = Observable.Create((IObserver<Guid> o) =>
        {
            Console.WriteLine("Start");
            Thread.Sleep(1000); // process
            Console.WriteLine("End");
            o.OnNext(Guid.NewGuid());
            //o.OnCompleted(); // <-- uncomment this
            return Disposable.Empty;
        })
        .Replay(TimeSpan.FromSeconds(2))
        .RefCount()
        .Take(1);

        ThreadPool.QueueUserWorkItem(delegate
        {
            // simulate request from threadpool
            obs.Subscribe(x => Console.WriteLine($"1: {x}"), () => Console.WriteLine($"1: complete"));
        });

        ThreadPool.QueueUserWorkItem(delegate
        {
            obs.Subscribe(x => Console.WriteLine($"2: {x}"), () => Console.WriteLine($"2: complete"));
        });

        Thread.Sleep(4000);

        ThreadPool.QueueUserWorkItem(delegate
        {
            obs.Subscribe(x => Console.WriteLine($"3: {x}"), () => Console.WriteLine($"3: complete"));
        });

结果如下:

Start
End
1: 255BEFDC-2F14-40AD-AE77-2B005C5A3AA9
2: 255BEFDC-2F14-40AD-AE77-2B005C5A3AA9
1: complete
2: complete
Start
End
3: 1214DC63-F688-475A-9CB7-C3784054A4AC
3: complete

奇怪的行为是,如果我取消注释 'o.OnCompleted()' 行,结果会变成这样:

Start
End
1: 255BEFDC-2F14-40AD-AE77-2B005C5A3AA9
2: 255BEFDC-2F14-40AD-AE77-2B005C5A3AA9
1: complete
2: complete
Start
End
3: complete

第三个订阅者导致另一个订阅根 observable 但结果丢失。看起来 ReplaySubject 缓存了前一个 observable 已完成的结果,但仍会导致新的订阅。这似乎不直观。我想了解为什么它不起作用。

注意:我最初使用 Defer 而不是 Create 进行了尝试,这与上面第二次运行的结果相同(原因很明显)。

【问题讨论】:

    标签: system.reactive


    【解决方案1】:

    当您使用 Replay/RefCount 对时,您创建了一个共享对源 observable 的公共订阅的 observable。

    来源:

    返回一个可连接的可观察序列,该序列共享一个对重放所有通知的底层序列的订阅。

    现在,重要的是要记住,一个 observable 会产生一系列零个或多个值,然后是完整或错误信号。产生完整或错误后无法产生值。

    由于您共享对源的共同订阅,并且如果您的源产生完整的源,那么它就不能产生更多的值。因此,当您致电 o.OnCompleted() 时,您就是这样做的。

    另外,作为旁注,您应该避免在Create 中写入return Disposable.Empty;。这意味着您正在创建一个可在订阅返回之前完成的可观察对象,这可能会导致竞争条件。

    不用它来编写代码的方法是:

    var obs =
        Observable
            .Defer(() => Observable.Return(Guid.NewGuid()).Concat(Observable.Never<Guid>()))
            .Replay(TimeSpan.FromSeconds(2.0))
            .RefCount()
            .Take(1);
    

    但这和不打电话给o.OnCompleted()是一样的。

    【讨论】:

    • 我想我需要重新表述一下我的问题:如果Replay对Observable有一次订阅,为什么超时后有订阅又执行订阅动作?
    • 关于“Disposable.Empty”的使用,我从这里得到了建议:introtorx.com/Content/v1.0.10621.0/20_Disposables.html
    • @DavidJ - 显然有一种重新订阅的机制 - 对 Replay 的底层调用正在调用 source.Multicast(new ReplaySubject&lt;TSource&gt;(window));,所以你必须追踪那个兔子洞才能看到它通向哪里。跨度>
    • @DavidJ - 该链接没有提供关于何时应该使用Disposable.Empty 的建议,只是在什么情况下可以使用。我的建议是在 99.99% 的情况下避免使用它。这会导致坏事。
    • 我现在正在查看 Observable.Create 的代码。看来,无论我返回什么,返回值都会被包裹在一个 Disposable 中。如果我返回“null”,则将其更改为 Disposable.Empty。如果我返回一个 Action,那么它会被包装在 Disposable.Create 中 - 如果我返回一个无操作操作,它与返回 Disposable.Empty 相同。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2020-09-20
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-06-17
    • 2016-06-10
    • 2014-09-12
    相关资源
    最近更新 更多