【问题标题】:Observable that produces a single item per subscription - one event per handler每个订阅产生一个项目的 Observable - 每个处理程序一个事件
【发布时间】:2014-06-26 22:39:57
【问题描述】:

假设我想公开一个 observable 立即通知,如果 Internet 连接现在可用,或者如果设备未连接到 Internet,则通知将在可用时推出:

IObservable<DateTime> InternetBecameAvailableSignalledOncePerSubscriber { get; }

此外,每个订阅应该只有一个通知,而不需要订阅者执行 .Take(1) 或类似的操作。

即依赖于 Internet 资源的客户端将使用这个 observable 现在或 Internet 可用时立即执行某些操作,但不会多次执行此操作 - 如果 Internet 变得不可用且可用,则不会再向该订阅者发出信号第二次..

如何使用 Reactive Extensions (Rx) 实现这一点?

  • 请忽略互联网可以这样使用的有缺陷的概念,将其视为可访问 google.com 或您喜欢的任何风格的实现

【问题讨论】:

  • 您可以使用 .Aggregate 执行此操作 - 但在这种情况下,我建议您编写自己的类/包装器,因为您可能希望更好地控制订阅过程
  • 你绝对可以用 Rx 做到这一点,但是当只有一个通知时,通常异步方法更容易。

标签: c# .net linq system.reactive


【解决方案1】:

这应该可以通过 Rx 轻松解决。问题是您如何知道互联网是否可用?它是基于其他消费者的订阅,是基于另一种方法(如Connect() 方法)还是某个正在推送给您的事件(如 WCF 通道状态更改事件)?

根据这个答案,您似乎只需要封装您的 Take(1) 和 Replay(1)。

public class IServiceClient
{
    IObservable<DateTime> LastConnnected { get; }
}

public class ServiceClient : IServiceClient, IDisposable
{
    private readonly IDisposable _connection;
    private readonly IObservable<DateTime> _lastConnnected;

    public ServiceClient()
    {
        //Question 1) where does the 'Connected' sequence come from i.e. what is it that tells you that you have internet connectivity?
        //Question 2) When should the subscription be made to 'Connected'? Here I cheat and do it in the ctor, not great.
        var connected = Connected.Replay(1)
                                .Where(isConnected=>isConnected)
                                .Take(1)
                                .Select(_=>DateTime.UtcNow);

        _lastConnnected = connected;
        _connection = connected.Connect();
    }

    public IObservable<DateTime> LastConnnected{ get {return _lastConnnected; } }

    public void Dispose()
    {
        _connection.Dispose();
    }
}

这确实给您留下了一些其他问题需要回答,例如是什么告诉您您是否有 Internet 连接以及对此的资源管理计划是什么?

更新代码

public interface IServiceClient
{
    IObservable<DateTime> LastConnnected { get; }
}

public class ServiceClient : IServiceClient, IDisposable
{
    private readonly IDisposable _connection;
    private readonly IObservable<bool> _lastConnnected;

    public ServiceClient(IObservable<ConnectionState> connectedStates)
    {
        var cachedStates = connectedStates.Select(state=>state.IsConnected).Replay(1);
        _lastConnnected = cachedStates;
        _connection = cachedStates.Connect();
    }

    public IObservable<DateTime> LastConnnected
    { 
        get 
        {
            return _lastConnnected.StartWith(IsConnected())
                                  .Where(isConnected=>isConnected)
                                  .Take(1)
                                  .Select(_=>DateTime.UtcNow); 
        } 
    }

    //....

    public void Dispose()
    {
        _connection.Dispose();
    }
}

【讨论】:

  • 谢谢!我有一个方法 bool IsConnected() 可用于查看是否应立即推送一个值,如果为假,那么我有一个热 IObservable 流式传输断开连接和连接状态 - 我不太熟悉可连接的可观察对象,那么在给定这个源方法和序列的情况下,代码应该如何改变?
  • 这里我们说我们想要重放 1 个值,所以它应该缓存它看到的最后一个值。但是,这需要订阅才能执行此操作。在这种情况下,我们“连接”以调用订阅并开始缓存值。在您的情况下,这不会有太大作用,因为您的序列已经很热了。代码更新
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-03-04
  • 1970-01-01
  • 2011-05-31
  • 1970-01-01
相关资源
最近更新 更多