【问题标题】:Wrapping legacy object in IConnectableObservable在 IConnectableObservable 中包装遗留对象
【发布时间】:2013-11-22 13:55:27
【问题描述】:

我有一个旧的基于事件的对象,它似乎非常适合 RX:在连接到网络源后,它会在收到消息时引发事件,并且可能会因单个错误而终止(连接中断等) .) 或(很少)表明将不再有消息。这个对象也有几个预测——大多数用户只对接收到的消息的一个子集感兴趣,所以只有当众所周知的消息子类型出现时才会引发备用事件。

因此,在学习更多关于反应式编程的过程中,我构建了以下包装器:

class LegacyReactiveWrapper : IConnectableObservable<TopLevelMessage>
{
    private LegacyType _Legacy;
    private IConnectableObservable<TopLevelMessage> _Impl;
    public LegacyReactiveWrapper(LegacyType t) 
    {
        _Legacy = t;
        var observable = Observable.Create<TopLevelMessage>((observer) => 
        {
            LegacyTopLevelMessageHandler tlmHandler = (sender, tlm) => observer.OnNext(tlm);
            LegacyErrorHandler errHandler = (sender, err) => observer.OnError(new ApplicationException(err.Message));
            LegacyCompleteHandler doneHandler = (sender) => observer.OnCompleted();

            _Legacy.TopLevelMessage += tlmHandler;
            _Legacy.Error += errHandler;
            _Legacy.Complete += doneHandler;

            return Disposable.Create(() => 
            {
                _Legacy.TopLevelMessage -= tlmHandler;
                _Legacy.Error -= errHandler;
                _Legacy.Complete -= doneHandler;
            });
        });

        _Impl = observable.Publish();
    }

    public IDisposable Subscribe(IObserver<TopLevelMessage> observer)
    {
        return _Impl.RefCount().Subscribe(observer);
    }

    public IDisposable Connect()
    {
        _Legacy.ConnectToMessageSource();
        return Disposable.Create(() => _Legacy.DisconnectFromMessageSource());
    }

    public IObservable<SubMessageA> MessageA
    {
        get
        {
            // This is the moral equivalent of the projection behavior
            // that already exists in the legacy type. We don't hook
            // the LegacyType.MessageA event directly.
            return _Impl.RefCount()
                    .Where((tlm) => tlm.MessageType == MessageType.MessageA)
                    .Select((tlm) => tlm.SubMessageA);
        }
    }

    public IObservable<SubMessageB> MessageB
    {
        get
        {
            return _Impl.RefCount()
                    .Where((tlm) => tlm.MessageType == MessageType.MessageB)
                    .Select((tlm) => tlm.SubMessageB);
        }
    }
}

它在其他地方的使用感觉……有点……不过……不知何故。这是示例用法,它有效但感觉很奇怪。我的测试应用程序的 UI 上下文是 WinForms,但这并不重要。

// in Program.Main... 

MainForm frm = new MainForm();

// Updates the UI based on a stream of SubMessageA's
IObserver<SubMessageA> uiManager = new MainFormUiManager(frm);

LegacyType lt = new LegacyType();
// ... setup lt...

var w = new LegacyReactiveWrapper(lt);

var uiUpdateSubscription = (from msgA in w.MessageA
                            where SomeCondition(msgA)
                            select msgA).ObserveOn(frm).Subscribe(uiManager);

var nonUiSubscription = (from msgB in w.MessageB
                         where msgB.SubType == MessageBType.SomeSubType
                         select msgB).Subscribe(
                             m => Console.WriteLine("Got MsgB: {0}", m),
                             ex => Console.WriteLine("MsgB error: {0}", ex.Message),
                             () => Console.WriteLine("MsgB complete")
                         );

IDisposable unsubscribeAtExit = null;
frm.Load += (sender, e) => 
{
    var connectionSubscription = w.Connect();
    unsubscribeAtExit = new CompositeDisposable(
                               uiUpdateSubscription,
                               nonUiSubscription,
                               connectionSubscription);
};

frm.FormClosing += (sender, e) => 
{
    if(unsubscribeAtExit != null) { unsubscribeAtExit.Dispose(); }
};


Application.Run(frm);

这个工作——表单启动,UI 更新,当我关闭它时,订阅被清理并且进程退出(如果 LegacyType 的网络连接仍然连接,它不会这样做)。严格来说,只处理connectionSubscription 就足够了。然而,明确的Connect 让我感觉很奇怪。由于RefCount 应该为您执行此操作,因此我尝试修改包装器,而不是在MessageAMessageB 中使用_Impl.RefCount 并稍后显式连接,而是使用this.RefCount 并将调用移至@ 987654330@ 到Load 处理程序。那有一个不同的问题——第二次订阅触发了对LegacyReactiveWrapper.Connect的另一个调用。我认为Publish/RefCount 背后的想法是“先入触发连接,后出处理连接”。

我想我有三个问题:

  1. 我是否从根本上误解了Publish/RefCount
  2. 是否有一些优选的方式来实现IConnectableObservable&lt;T&gt;,而不涉及委托给通过IObservable&lt;T&gt;.Publish 获得的方式?我知道你不应该自己实现IObservable&lt;T&gt;,但我不明白如何将连接逻辑注入Observable.Create().Publish() 给你的IConnectableObservable&lt;T&gt;Connect 应该是幂等的吗?
  3. 更熟悉 RX/反应式编程的人是否会查看示例以了解包装器的使用方式并说“这很丑陋且损坏”,或者这不像看起来那么奇怪?

【问题讨论】:

    标签: c#-4.0 system.reactive reactive-programming


    【解决方案1】:

    我不确定您是否需要像以前那样直接公开 Connect。我将简化如下,使用Publish().RefCount() 作为封装的实现细节;这将导致仅在需要时进行旧式连接。在这里,第一个订户导致连接,最后一个订户导致断开连接。另请注意,这正确地在所有订阅者之间共享一个 RefCount,而您的实现对每个消息类型使用 RefCount,这可能不是预期的。用户不需要明确连接:

    public class LegacyReactiveWrapper
    {
        private IObservable<TopLevelMessage> _legacyRx; 
    
        public LegacyReactiveWrapper(LegacyType legacy)
        {
            _legacyRx = WrapLegacy(legacy).Publish().RefCount();
        }
    
        private static IObservable<TopLevelMessage> WrapLegacy(LegacyType legacy)
        {
            return Observable.Create<TopLevelMessage>(observer =>
            {
                LegacyTopLevelMessageHandler tlmHandler = (sender, tlm) => observer.OnNext(tlm);
                LegacyErrorHandler errHandler = (sender, err) => observer.OnError(new ApplicationException(err.Message));
                LegacyCompleteHandler doneHandler = sender => observer.OnCompleted();
    
                legacy.TopLevelMessage += tlmHandler;
                legacy.Error += errHandler;
                legacy.Complete += doneHandler;
                legacy.ConnectToMessageSource();
    
                return Disposable.Create(() =>
                {
                    legacy.DisconnectFromMessageSource();
                    legacy.TopLevelMessage -= tlmHandler;
                    legacy.Error -= errHandler;
                    legacy.Complete -= doneHandler;
                });
            });
        }
    
        public IObservable<TopLevelMessage> TopLevelMessage
        {
            get
            {
                return _legacyRx;
            }
        }
    
        public IObservable<SubMessageA> MessageA
        {
            get
            {
                return _legacyRx.Where(tlm => tlm.MessageType == MessageType.MessageA)
                                .Select(tlm => tlm.SubMessageA);
            }
        }
    
        public IObservable<SubMessageB> MessageB
        {
            get
            {
                return _legacyRx.Where(tlm => tlm.MessageType == MessageType.MessageB)
                                .Select(tlm => tlm.SubMessageB);
            }
        }
    }
    

    另一个观察结果是,Publish().RefCount() 将在其订阅者数量达到 0 时放弃基础订阅。通常,即使已发布源上的订阅者数量下降,我也只在需要维持订阅时才使用 Connect 而不是此选项为零(以后可能会再次回升)。但很少需要这样做 - 仅当连接比在您可能不需要的情况下保持订阅资源更昂贵时。

    【讨论】:

    • +1 for Publish().RefCount() 作为实现细节。太多次我看到默认情况下不执行此操作的事件包装器。 Rx 正在为您处理订阅,因此请确保您不会通过向遗留事件添加冗余事件处理程序来加倍处理它们! :)
    【解决方案2】:
    1. 您的理解并不完全错误,但您似乎确实存在一些误解。

      您似乎相信在同一源 IObservable 上多次调用 RefCount 将导致共享引用计数。他们不;每个实例都有自己的计数。因此,您将导致多次订阅 _Impl,每次调用订阅或调用 Message 属性一次。

      您还可能期望将_Impl 设置为IConnectableObservable 会以某种方式导致您的Connect 方法被调用(因为您似乎很惊讶需要在使用代码中调用 Connect)。 Publish 所做的所有事情都是使已发布对象的订阅者(从 .Publish() 调用返回)共享对底层源 observable 的单个订阅(在这种情况下,是通过调用 Observable.Create 生成的对象)。

      通常,我看到 Publish 和 RefCount 立即一起使用(例如 source.Publish().RefCount())来获得上述共享订阅效果或使冷的 observable 变热,而无需调用 Connect 来启动对原始源的订阅。但是,这依赖于对所有订阅者使用从 .Publish().RefCount() 返回的相同对象(如上所述)。

    2. 您对 Connect 的实现似乎是合理的。我不知道关于 Connect 是否应该是幂等的任何建议,但我个人并不期望它是。如果你想要它,你只需要跟踪对它的调用以及对它的返回值的处理以获得正确的平衡。

      我认为您不需要按照自己的方式使用 Publish,除非有某种原因可以避免将多个事件处理程序附加到旧对象。如果您确实需要避免这种情况,我建议将_Impl 更改为普通的IObservable,然后在Publish 后面加上RefCount

    3. 您的 MessageAMessageB 属性可能会导致用户混淆,因为它们返回一个 IObservable,但仍需要调用基础对象上的 Connect 才能开始接收消息。我要么将它们更改为以某种方式委托给原始 Connect 的 IConnectableObservables(此时幂等性讨论变得更加相关),要么删除属性并让用户在需要时自己进行(相当简单的)预测。

    【讨论】:

    • 核心问题是我误解了 RefCount 的工作原理。幂等性的问题与可能需要的解决方法有关,如果它按照我认为的方式工作,而不是它实际上是如何工作的。你有一个有效的点 3);我将它们包括在内是为了弄清楚与 Publish() 的连接共享是如何工作的,并且因为我们有很多代码关心“仅 A”或“仅 B”,尽管它们出现在同一个流中。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-10-12
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-11-18
    • 2022-08-07
    相关资源
    最近更新 更多