【问题标题】:Why is RefCount not working after all initial subscribers disconnect?为什么在所有初始订阅者断开连接后 RefCount 不起作用?
【发布时间】:2016-03-01 07:10:11
【问题描述】:

考虑以下几点:

[Fact]
public void foo()
{
    var result = new Subject<bool>();
    var startCount = 0;
    var completionCount = 0;
    var obs = Observable
        .Defer(() =>
            {
                ++startCount;
                return result.FirstAsync();
            })
        .Do(_ => ++completionCount)
        .Publish()
        .RefCount();

    // pretend there are lots of subscribers at once
    var s1 = obs.Subscribe();
    var s2 = obs.Subscribe();
    var s3 = obs.Subscribe();

    // even so, we only expect to be started once
    Assert.Equal(1, startCount);
    Assert.Equal(0, completionCount);

    // and we won't complete until the result ticks through
    result.OnNext(true);
    Assert.Equal(1, startCount);
    Assert.Equal(1, completionCount);

    s1.Dispose();
    s2.Dispose();
    s3.Dispose();

    // now try exactly the same thing again
    s1 = obs.Subscribe();
    s2 = obs.Subscribe();
    s3 = obs.Subscribe();

    // startCount is 4 here instead of the expected 2!
    Assert.Equal(2, startCount);
    Assert.Equal(1, completionCount);

    result.OnNext(true);
    Assert.Equal(2, startCount);
    Assert.Equal(2, completionCount);

    s1.Dispose();
    s2.Dispose();
    s3.Dispose();
}

我对@9​​87654322@ + RefCount 的理解是,只要至少有一个订阅者,就会保持与源的连接。一旦最后一个订阅者断开连接,任何未来的订阅者都将重新启动与源的连接。

正如您在我的测试中看到的那样,第一次一切正常。但第二次,管道内的延迟 observable 会为每个新订阅者执行一次。

我可以通过调试器看到,对于第一组订阅者,obs._count(计算订阅者)在每次调用 Subscribe 时都会增加。但是对于第二组订阅者,它仍然是零。

为什么会发生这种情况,我可以做些什么来纠正我的管道?

【问题讨论】:

    标签: c# .net system.reactive


    【解决方案1】:

    @user631090 的答案很接近,但不正确,所以我想我会自己回答。

    这是因为如果Publish 发布的流本身已经完成,它将立即完成新订阅者。你可以在图中看到here

    但如果图表包含订阅者基础流完成之后会很好。

    为了增加混乱,Defer 仍被称为新订阅者。但由于初始流完成,它的返回值被Publish 简单地忽略了。

    我还没有想出一种方法来实现我的预期用例。我想也许使用Multicast 而不是Publish,根据需要创建一个新主题。但我还没有能够做到这一点。对于我认为常见的用例来说,这似乎相当痛苦。

    【讨论】:

    • Kent,您介意解释一下您的预期用例(在另一篇文章中)吗?也许社区可以在那里更直接地帮助你。 (可能会减少移动部分的数量:主题 + 延迟 + 第一 + 发布 + 引用计数并给出问题(不是错误)可能会让我们提供更多帮助。
    • 当然,李。我刚刚发布了这个问题作为后续问题:stackoverflow.com/questions/35762063/…
    【解决方案2】:

    这是因为底层的 observable result 已经完成了。所以每个新订阅者都只是得到 OnCompleted 回调。

    如果 ObservableDefer 每次都创建一个新序列或一个未完成的序列,您会看到所需的行为。

    例如

    return result.FirstAsync().Concat(Observable.Never<bool>());
    

    您需要删除Assert.Equal(1, completionCount);

    【讨论】:

    • 这听起来很合理,但我很难生成一个可以按预期工作的序列。我原以为return result.Take(1); 而不是return result.FirstAsync(); 会起作用,但我得到了相同的结果。很好奇。,
    • result完成。当一个新值被勾选时,对result.FirstAsync 的每个单独调用都将完成。
    • 我应该说 result.FirstAsync 已经完成。我同意这种行为很奇怪。看起来这是以某种方式捕获的,因此未来的订阅者可以取回已完成的 observable。您可以通过连接您的第二组订阅者 OnCompleted 事件来看到这一点,它们会立即触发,而不是等待您发送另一个 .OnNext。因此,每个新订阅者都会增加计数。
    • result.FirstAsync 也没有完成。即使我在Defer 处理程序中创建一个全新的Subject&lt;bool&gt;,我也会得到相同的行为。所以更像是Defer 被正确调用以获得下一个可观察对象,但随后它被忽略了。可能是Publish / RefCount 的错误?
    猜你喜欢
    • 1970-01-01
    • 2021-12-16
    • 2017-11-18
    • 2022-01-18
    • 2023-03-29
    • 1970-01-01
    • 2021-11-26
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多