【问题标题】:Replay timestamped event stream with Reactive Extensions使用 Reactive Extensions 重放带时间戳的事件流
【发布时间】:2018-02-18 15:35:19
【问题描述】:

我收集了以下类别的物品:

public class Event
{
    public DateTimeOffset Timestamp;
    public object Data;
}

我想创建IObservable<Event>,其中每个项目在将来Timestamp 时发布。 Observable.Delay 可以做到这一点,还是我必须编写自己的 IObservable<T> 实现?

我会提到这个结构有点像日志文件。 Event 可能有数万个项目,但每秒只发布 1-2 个。

【问题讨论】:

    标签: c# system.reactive


    【解决方案1】:

    事实证明,Observable.Delay 重载占用可变时间非常简单:

    //given IEnumerable<Event> events:
    var observable = events.ToObservable().Delay(ev => Observable.Timer(ev.Timestamp));
    

    【讨论】:

      【解决方案2】:

      虽然我的第一个答案按预期工作,但创建可观察序列的性能对于数十万个事件并不理想 - 您需要支付大量初始化成本(在我的机器上大约 10 秒)。

      为了提高性能,利用我的数据已经排序的性质,我实现了自定义IEnumerable&lt;Event&gt;,它循环遍历事件,在它们之间产生和休眠。有了这个IEnumerable,人们可以很容易地调用ToObservable&lt;T&gt;,它可以按预期工作:

      IObservable<Event> CreateSimulation(IEnumerable<Event> events)
      {
           IEnumerable<Event> simulation()
           {
               foreach(var ev in events)
               {
                   var now = DateTime.UtcNow;
      
                   if(ev.Timestamp > now)
                   {
                       Thread.Sleep(ev.Timestamp - now);
                   }
      
                   yield return ev;          
              }
          }
      
          return simulation().ToObservable();
      }
      

      【讨论】:

        【解决方案3】:

        似乎 Rx 库缺少将 IEnumerable&lt;T&gt; 转换为 IObservable&lt;T&gt; 的机制,方法是惰性枚举它并时移其元素。下面是一个自定义实现。这个想法是Zip 与一个主题可枚举的源,并通过在适当的时刻向主题发送OnNext 通知来控制枚举。

        /// <summary>Converts an enumerable sequence to a time shifted observable sequence,
        /// based on a time selector function for each element.</summary>
        public static IObservable<T> ToObservable<T>(
            this IEnumerable<T> source,
            Func<T, DateTimeOffset> dueTimeSelector,
            IScheduler scheduler = null)
        {
            scheduler ??= Scheduler.Default;
            return Observable.Defer(() =>
            {
                var subject = new BehaviorSubject<Unit>(default);
                return subject
                    .Zip(source, (_, x) => x)
                    .Delay(x => Observable.Timer(dueTimeSelector(x), scheduler))
                    .Do(_ => subject.OnNext(default));
            });
        }
        

        之所以选择BehaviorSubject,是因为它有一个初始值,因此它可以让轮子自然地运动。

        Observable.Defer 运算符用于防止多个订阅共享同一状态(本例中为 subject)并相互干扰。有关此here 的更多信息。

        使用示例:

        IEnumerable<Event> events = GetEvents();
        
        IObservable<Event> observable = events.ToObservable(x => x.Timestamp);
        

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2023-03-05
          • 1970-01-01
          相关资源
          最近更新 更多