【问题标题】:Publish last value of event for new subscribers为新订阅者发布事件的最后一个值
【发布时间】:2015-08-17 13:00:35
【问题描述】:

我有一个课程 Foo 有一个发布 FooState 枚举的事件。我想把这个事件变成一个 observable,为新订阅者重放最后一个值。

即使没有订阅者,任何新订阅者都应该获得最后一个值。

public enum FooState
{
    Stopped = 0,
    Starting = 1,
    Running = 2,        
}

public delegate void FooEventHandler(object sender, FooEventArgs e);

public class FooEventArgs : EventArgs
{
    public FooEventArgs(FooState fooState)
    {
        this.State = fooState;
    }

    public FooState State {get; private set;}
}

public class Foo
{
    public event FooEventHandler FooEvent;

    public void OnFooEvent(FooState state)
    {
        var fooEvent = FooEvent;

        if(fooEvent != null)
        {
            fooEvent(this, new FooEventArgs(state));
        }
    }
}

到目前为止,我的尝试围绕着使用 PublishRefCountReplay。但是,如果我在触发事件后订阅 observable,我尝试的任何组合都不起作用。

Replay(1).RefCount() 只要至少有一个订阅就可以工作,但我还需要为第一个延迟订阅工作。

var foo = new Foo();

   var obs =  Observable.FromEventPattern<FooEventHandler, FooEventArgs>(
                                        h => foo.FooEvent += h,
                                        h => foo.FooEvent -= h)
                                    .DistinctUntilChanged()
                                    .Replay(1)
                                    .RefCount();

    // Works if this line is uncomented.
    //obs.Subscribe(x => Console.WriteLine("Early Subscriber = " + x.EventArgs.State));

    foo.OnFooEvent(FooState.Running);

    obs.Subscribe(x => Console.WriteLine("Late Subscriber = " + x.EventArgs.State));

有人知道如何用 Rx 做到这一点吗?

【问题讨论】:

    标签: c# system.reactive


    【解决方案1】:

    RefCount 仅在第一次订阅后连接。如果您想对连接发生的时间进行细粒度控制,您应该使用Replay + Connect

    那就这样做吧:

    var publishedSource = eventSource.DistinctUntilChanged().Replay(1);
    
    var connection = publishedSource.Connect();
    
    //Subscribe to publishedSource to receive events and dispose of 
    connection when you are done.
    

    从我的手机发帖,如有任何语法错误,请提前道歉。

    【讨论】:

    • 有没有办法在创建 observable 时自动Connect?也就是说,我不必单独管理Connect 订阅?
    • @NedStoyanov 您将如何确定何时清理流?基本上你怎么知道流何时结束?
    • @NedStoyanov - 您要么必须通过调用 .Connect() 或通过 .Subscribe(...) 进行连接。但是由于您想在订阅之间共享值,因此您别无选择,只能使用 .Connect()
    【解决方案2】:

    Rx 正在做正确的事情,将您的事件通知转换为您的流并重放它们,但您要问的是: “为什么我订阅事件时,没有得到初始状态”。

    事件不是这样工作的。如果我在 foo.FooEvent 上执行 +=,我不会立即触发当前值。我只有在它发生变化时才会收到通知。 如您所见,“重播”将重播后续事件,但不提供订阅时的状态。

    要解决您的问题,您需要确保在连接流以获取更改通知之前将当前值放入流中。 查看 Observable.StartWith()。

    即在 .DistinctUntilChanged() 调用之前执行“.StartWith(foo.State)”(紧接在 .FromEventPattern 之后)。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-09-20
      相关资源
      最近更新 更多