【发布时间】:2013-11-22 13:55:27
【问题描述】:
我有一个旧的基于事件的对象,它似乎非常适合 RX:在连接到网络源后,它会在收到消息时引发事件,并且可能会因单个错误而终止(连接中断等) .) 或(很少)表明将不再有消息。这个对象也有几个预测——大多数用户只对接收到的消息的一个子集感兴趣,所以只有当众所周知的消息子类型出现时才会引发备用事件。
因此,在学习更多关于反应式编程的过程中,我构建了以下包装器:
class LegacyReactiveWrapper : IConnectableObservable<TopLevelMessage>
{
private LegacyType _Legacy;
private IConnectableObservable<TopLevelMessage> _Impl;
public LegacyReactiveWrapper(LegacyType t)
{
_Legacy = t;
var observable = Observable.Create<TopLevelMessage>((observer) =>
{
LegacyTopLevelMessageHandler tlmHandler = (sender, tlm) => observer.OnNext(tlm);
LegacyErrorHandler errHandler = (sender, err) => observer.OnError(new ApplicationException(err.Message));
LegacyCompleteHandler doneHandler = (sender) => observer.OnCompleted();
_Legacy.TopLevelMessage += tlmHandler;
_Legacy.Error += errHandler;
_Legacy.Complete += doneHandler;
return Disposable.Create(() =>
{
_Legacy.TopLevelMessage -= tlmHandler;
_Legacy.Error -= errHandler;
_Legacy.Complete -= doneHandler;
});
});
_Impl = observable.Publish();
}
public IDisposable Subscribe(IObserver<TopLevelMessage> observer)
{
return _Impl.RefCount().Subscribe(observer);
}
public IDisposable Connect()
{
_Legacy.ConnectToMessageSource();
return Disposable.Create(() => _Legacy.DisconnectFromMessageSource());
}
public IObservable<SubMessageA> MessageA
{
get
{
// This is the moral equivalent of the projection behavior
// that already exists in the legacy type. We don't hook
// the LegacyType.MessageA event directly.
return _Impl.RefCount()
.Where((tlm) => tlm.MessageType == MessageType.MessageA)
.Select((tlm) => tlm.SubMessageA);
}
}
public IObservable<SubMessageB> MessageB
{
get
{
return _Impl.RefCount()
.Where((tlm) => tlm.MessageType == MessageType.MessageB)
.Select((tlm) => tlm.SubMessageB);
}
}
}
它在其他地方的使用感觉……有点……不过……不知何故。这是示例用法,它有效但感觉很奇怪。我的测试应用程序的 UI 上下文是 WinForms,但这并不重要。
// in Program.Main...
MainForm frm = new MainForm();
// Updates the UI based on a stream of SubMessageA's
IObserver<SubMessageA> uiManager = new MainFormUiManager(frm);
LegacyType lt = new LegacyType();
// ... setup lt...
var w = new LegacyReactiveWrapper(lt);
var uiUpdateSubscription = (from msgA in w.MessageA
where SomeCondition(msgA)
select msgA).ObserveOn(frm).Subscribe(uiManager);
var nonUiSubscription = (from msgB in w.MessageB
where msgB.SubType == MessageBType.SomeSubType
select msgB).Subscribe(
m => Console.WriteLine("Got MsgB: {0}", m),
ex => Console.WriteLine("MsgB error: {0}", ex.Message),
() => Console.WriteLine("MsgB complete")
);
IDisposable unsubscribeAtExit = null;
frm.Load += (sender, e) =>
{
var connectionSubscription = w.Connect();
unsubscribeAtExit = new CompositeDisposable(
uiUpdateSubscription,
nonUiSubscription,
connectionSubscription);
};
frm.FormClosing += (sender, e) =>
{
if(unsubscribeAtExit != null) { unsubscribeAtExit.Dispose(); }
};
Application.Run(frm);
这个工作——表单启动,UI 更新,当我关闭它时,订阅被清理并且进程退出(如果 LegacyType 的网络连接仍然连接,它不会这样做)。严格来说,只处理connectionSubscription 就足够了。然而,明确的Connect 让我感觉很奇怪。由于RefCount 应该为您执行此操作,因此我尝试修改包装器,而不是在MessageA 和MessageB 中使用_Impl.RefCount 并稍后显式连接,而是使用this.RefCount 并将调用移至@ 987654330@ 到Load 处理程序。那有一个不同的问题——第二次订阅触发了对LegacyReactiveWrapper.Connect的另一个调用。我认为Publish/RefCount 背后的想法是“先入触发连接,后出处理连接”。
我想我有三个问题:
- 我是否从根本上误解了
Publish/RefCount? - 是否有一些优选的方式来实现
IConnectableObservable<T>,而不涉及委托给通过IObservable<T>.Publish获得的方式?我知道你不应该自己实现IObservable<T>,但我不明白如何将连接逻辑注入Observable.Create().Publish()给你的IConnectableObservable<T>。Connect应该是幂等的吗? - 更熟悉 RX/反应式编程的人是否会查看示例以了解包装器的使用方式并说“这很丑陋且损坏”,或者这不像看起来那么奇怪?
【问题讨论】:
标签: c#-4.0 system.reactive reactive-programming