【问题标题】:Count of observers in a Rx SubjectRx 主题中的观察者计数
【发布时间】:2017-05-06 16:39:36
【问题描述】:

使用 Rx,获取 Subject 中当前观察者数量的最佳方法是什么?

我有一个场景,我想发布一条消息,但前提是有观察者。如果没有观察者,我需要做点别的。

为了解决这个问题,我所做的是创建自己的 ISubject 实现并公开内部 IObserver 集合的计数。我确信必须有一种开箱即用的方式来做到这一点,我只是不完全熟悉 Rx 所提供的功能。

谢谢!

【问题讨论】:

  • 为什么要关心是否有观察者?
  • 我关心的不是我有一个观察者,而是关心我没有观察者的时候。在我的应用程序中,我有处理某些消息的窗口。他们是观察者。如果没有打开窗口,我会创建一个新窗口。所以我需要知道我什么时候没有观察者才能创建一个新窗口。
  • 这听起来像是你要放入观察者的逻辑(不管你的窗口管理是什么);将其放入您观察的事物中似乎是一个糟糕的设计决定。
  • 我在这里必须同意@casperOne。这通常应该以订阅构建管道的形式表示。

标签: system.reactive


【解决方案1】:

使用Subject<T>.HasObservers 属性。

Source Code

我不记得它是什么时候引入的,但我很确定它并不总是存在。它可能是在 Rx 2.0 中添加的。

【讨论】:

  • 这对我帮助很大。谢谢。
  • 有趣的是,如果你在 doOnUnsubscribe 中使用它,它总是会返回 true,afaict
【解决方案2】:

您应该尽可能避免实现自己的可观察(或主题)实现。

您当然可以尝试编写一个包装类来提供帮助。

试试这个:

public class Countable
{
    private int _count;
    public int Count { get { return _count; } }
    public IObservable<T> GetCountable<T>(IObservable<T> source)
    {
        return Observable.Create<T>(o =>
        {
            Interlocked.Increment(ref _count);
            var subscription = source.Subscribe(o);
            var decrement = Disposable.Create(() =>
            {
                Interlocked.Decrement(ref _count);
            });
            return new CompositeDisposable(subscription, decrement);
        });
    }
}

然后您可以编写如下代码:

var xs = new Subject<int>();
var countable = new Countable();
var ys = countable.GetCountable(xs);
Console.WriteLine(countable.Count);
var s1 = ys.Subscribe(y => { });
Console.WriteLine(countable.Count);
var s2 = ys.Subscribe(y => { });
Console.WriteLine(countable.Count);
s1.Dispose();
Console.WriteLine(countable.Count);
s2.Dispose();
Console.WriteLine(countable.Count);

我的运行结果是:

0
1
2
1
0

【讨论】:

    【解决方案3】:

    使用subject.observers.length,例如:

    import {Subject} from 'rxjs'
    
    let subject = new Subject()
    let s1 = subject.subscribe(v => console.log('observerA: ' + v))
    
    subject.next(1) // observerA: 1
    console.log(subject.observers.length) // 1
    
    
    let s2 = subject.subscribe(v => {
        console.log('observerB: ' + v)
        if(v===3) s2.unsubscribe()
    })
    
    subject.next(2) // observerA: 2
    console.log(subject.observers.length) // 2
    
    subject.next(3) // observerA: 3
    console.log(subject.observers.length) // 1
    

    【讨论】:

    • 我不知道这在 RxJS 中是否有效,但即使有效,它也不是标准 Rx 规范的一部分。如果我在 prod 中看到这个,我倾向于认为这是一个 hack。
    • 只是来这里说这是正确的解决方案(不是黑客,@forresthopkinsa)。这是手动输入:Subject.observers
    • 同样,这不是标准 Rx 规范的一部分。做 OP 想要做的事情的正确方法(如果有观察者则做 A,如果没有观察者则做 B)是使用.HasObservers
    猜你喜欢
    • 2011-09-25
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-02-12
    • 1970-01-01
    • 2011-11-25
    • 1970-01-01
    相关资源
    最近更新 更多