【问题标题】:RX IObserver subscription timingRX IObserver 订阅时间
【发布时间】:2015-05-20 14:40:21
【问题描述】:

我是 .net 中的 Rx 新手,但已经开始使用它并成功完成了一些网络通信

非常简单的例子:

    IObservable<Result> SendRequest(Request request)
    {
        return Observable.Create<Result>(observer =>
        {
            NetworkComms.SendReqeust(request,
                result =>
                {
                    observer.OnNext(result);
                    observer.OnCompleted();
                });
            return Disposable.Empty;
        }).SubscribeOn(_commsScheduler);
    }

我遇到的问题是,在调用者订阅返回的 IObservable 之前,实际上并未发送命令 (NetworkComms.SendRequest)。在某些情况下,请求是即发即弃的命令,结果毫无意义,因此调用者实际订阅返回的 IObservable 毫无意义。

我需要的功能是:

  1. 命令会立即发送,即使调用者从未订阅 IObservable
  2. 当客户端订阅时,即使延迟订阅也会得到结果
  3. 该命令只发送一次,但所有订阅都应该得到相同的结果。

我尝试使用 .Replay().RefCount() 执行此操作,并在返回 IObservable 之前在内部执行 Subscribe()。这几乎可以工作,但是如果客户端在收到结果后订阅(因此在完成时自动处理序列之后),它会导致再次调用订阅代码,发送命令第二次。

是否有一个简单的 Rx 扩展可以为我处理这种情况,还是我需要自己动手?

【问题讨论】:

  • 这是一个非常奇怪的组成这个 observable 的东西。你的意思是说我会不时地运行这个请求,直到时间结束(或应用程序关闭时)返回结果给我,不管它有多陈旧。
  • 你也不应该这样做return Disposable.Empty;。如果你这样做了,那么你可能犯了一个错误——你通常是在进行阻塞订阅(你就是这样)。您最好将订阅返回到Observable.Start(() =&gt; { ... }).Subscribe(observer)

标签: c# .net system.reactive


【解决方案1】:

您似乎想使用AsyncSubject&lt;T&gt;

IObservable<Result> SendRequest(Request request)
{
    var subject = new AsyncSubject<Result>();
    NetworkComms.SendReqeust(request, result =>
    {
        subject.OnNext(result);
        subject.OnCompleted();
    });
    return subject.AsObservable();
}

我应该补充一点,您想要的界面有点奇怪。如果语义是“在我调用此方法的那一刻,就发出了一个请求,任何想要得到响应的人都可以在以后得到响应”,那么请考虑只使用Task&lt;Result&gt; 而不是IObservable&lt;Result&gt;

Task<Result> SendRequestAsync(Request request)
{
    var tcs = new TaskCompletionSource<Result>();
    NetworkComms.SendReqeust(request, result => tcs.SetResult(result));
    return tcs.Task;
}

此外,考虑将此Task&lt;Result&gt; SendRequestAsync(Request request) 方法直接放在您的NetworkComms 类中。

【讨论】:

  • 啊,谢谢。我看过 Subject 类,但我读过的东西说它们不应该真正用于生产代码,所以我没有进一步考虑它们。看起来这是要走的路。奇数接口的要点是即使没有订阅结果也会发生操作,但如果订阅了结果(即使在操作完成后),仍然可以获得结果。结果将被多次订阅不是预期的情况,但我想明确一点,这不会仅仅为了完整性而重新触发操作。
  • @Andy (注意我编辑使用AsyncSubject&lt;T&gt; 而不是ReplaySubject&lt;T&gt; - 前者在这里更合适,因为序列只包含一个值。)“主题不应该用于生产代码”是一个笼统的说法。人们不喜欢主题的原因是它们是有状态的。但是,如果您需要做的是有状态的事情,那么它们是合适的。只是要小心使用它们。 (请参阅我在最后一段中关于替代方案的建议。)
  • @Andy 多次订阅返回的 observable 不会多次调用网络调用。订阅没有副作用。对 SendRequestAsync 的调用具有所有副作用。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多