【问题标题】:can I do independent subscribe/publish with Reactive Extensions?我可以使用 Reactive Extensions 进行独立订阅/发布吗?
【发布时间】:2012-04-18 20:08:33
【问题描述】:

假设我有一些 MyClass 类。在我的代码的一部分中,我想要这样的东西:

Observable.Subscribe<MyClass>(myClass => DoSomething(myClass));

然后在另一个地方(文件/项目/时间)我有这样的东西:

Observable.Publish(instanceOfMyClass);

第二行会触发所有使用该确切类类型订阅的方法。这是响应式扩展(v1 或 v2)支持的东西吗?

将 SynchronizationContext 指定为订阅调用的一部分会很有用。在那里指定方法是否应该使用 Wea​​kReference 也是很好的。并且 Publish 方法应该能够同步完成这一切,或者给我一些可以等待的东西。

【问题讨论】:

    标签: c# system.reactive publish-subscribe


    【解决方案1】:

    这并不难创建。

    您只需要一个内部的Dictionary&lt;Type, Object&gt; 并使用它来存储Type 的每个Subject&lt;T&gt;(作为对象)。

    然后您可以编写两个 SubscribePublish 方法来处理内部字典。

    其实应该很简单。


    与其说简单,我还想试试看。

    这是我的 Rx Pub/Sub 类:

    public static class RxPS
    {
        private static Dictionary<Type, object> _subjects
            = new Dictionary<Type, object>();
    
        public static IDisposable Subscribe<T>(Action<T> observer)
        {
            lock(_subjects)
            {
                if (!_subjects.ContainsKey(typeof(T)))
                {
                    _subjects.Add(typeof(T), new Subject<T>());
                }
                return (_subjects[typeof(T)] as Subject<T>)
                    .Subscribe(observer);
            }
        }
    
        public static void Publish<T>(T item)
        {
            lock(_subjects)
            {
                if (_subjects.ContainsKey(typeof(T)))
                {
                    (_subjects[typeof(T)] as Subject<T>)
                        .OnNext(item);
                }
            }
        }
    }
    

    这就是它的使用方式:

    RxPS.Publish(1);
    var d = RxPS.Subscribe<int>(x => Console.WriteLine(x)); 
    RxPS.Publish(2);
    d.Dispose();
    RxPS.Publish(3);
    

    结果是这段代码只会将2写入控制台。

    享受吧!

    【讨论】:

    • 向我解释“OnNext”?这会触发多个订阅者吗?如果我让“d”超出范围会发生什么?当垃圾收集器“轮回”时它会处理吗?
    • 调用OnNext 会为主题提供其自身可观察序列的下一个值,该值将发送给主题的所有订阅者。如果您让d 超出范围,则不会发生任何事情。 GC 将收集变量,但它从不调用Dispose,因此它不会取消任何订阅。调用Dispose 始终是程序员的责任。在 Rx 的情况下,如果您想提前取消订阅,您只需调用 Dispose,即在自然调用 OnCompletedOnError 之前。
    【解决方案2】:

    我相信您正在寻找像 ReactiveUI 的 MessageBus class 这样的东西。这个类使用 Rx 来实现发布/订阅模型,基本上只需要一个 Dictionary of Type => IObservables。

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-03-02
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多