【发布时间】:2015-05-20 14:40:21
【问题描述】:
我是 .net 中的 Rx 新手,但已经开始使用它并成功完成了一些网络通信
非常简单的例子:
IObservable<Result> SendRequest(Request request)
{
return Observable.Create<Result>(observer =>
{
NetworkComms.SendReqeust(request,
result =>
{
observer.OnNext(result);
observer.OnCompleted();
});
return Disposable.Empty;
}).SubscribeOn(_commsScheduler);
}
我遇到的问题是,在调用者订阅返回的 IObservable 之前,实际上并未发送命令 (NetworkComms.SendRequest)。在某些情况下,请求是即发即弃的命令,结果毫无意义,因此调用者实际订阅返回的 IObservable 毫无意义。
我需要的功能是:
- 命令会立即发送,即使调用者从未订阅 IObservable
- 当客户端订阅时,即使延迟订阅也会得到结果
- 该命令只发送一次,但所有订阅都应该得到相同的结果。
我尝试使用 .Replay().RefCount() 执行此操作,并在返回 IObservable 之前在内部执行 Subscribe()。这几乎可以工作,但是如果客户端在收到结果后订阅(因此在完成时自动处理序列之后),它会导致再次调用订阅代码,发送命令第二次。
是否有一个简单的 Rx 扩展可以为我处理这种情况,还是我需要自己动手?
【问题讨论】:
-
这是一个非常奇怪的组成这个 observable 的东西。你的意思是说我会不时地运行这个请求,直到时间结束(或应用程序关闭时)返回结果给我,不管它有多陈旧。
-
你也不应该这样做
return Disposable.Empty;。如果你这样做了,那么你可能犯了一个错误——你通常是在进行阻塞订阅(你就是这样)。您最好将订阅返回到Observable.Start(() => { ... }).Subscribe(observer)。
标签: c# .net system.reactive