【问题标题】:Reactive extension timer反应式扩展定时器
【发布时间】:2013-03-22 23:45:20
【问题描述】:

我有一个 HashSet。有时,新值会添加到此哈希集中。我想做的是让计时器在添加后一分钟从集合中删除每个元素。

我还是 rx 的新手,但这似乎是使用它的理想场合。

我尝试过这样的事情:

AddItem(string item)
{
  _mySet.Add(item);
  var timer = Observable.Timer(TimeSpan.FromSeconds(60), _scheduler);
  timer
      .Take(1)
      .Do(item => RemoveItem(item))
      .Subscribe(_ => Console.WriteLine("Removed {0}", item));
}

它似乎工作正常(通过单元测试)。

有人认为这种方法有什么问题吗?

【问题讨论】:

  • 您期望 AddItem("foo"); 的行为是什么? .. 30 秒后 AddItem("foo"); ?

标签: c# system.reactive


【解决方案1】:
  1. Do 调用中的 lambda 看起来不正确 - Observable.Timer 生成 int 值,但您的集合是 HashSet<string> - 这不应该编译。我猜这只是一个错字。

  2. Do:一般来说你的订阅应该在Subscribe完成。 Do 用于副作用(我不喜欢流中副作用的想法,所以我避免使用它,但它对调试很有用)。

  3. Take:Observable.Timer在终止前只产生一个值,因此不需要Take操作符

我会把你的函数写成:

AddItem(string item)
{
    _mySet.Add(item);
    Observable.Timer(TimeSpan.FromSeconds(60), _scheduler)
        .Subscribe(_ => RemoveItem(item));
}

【讨论】:

    【解决方案2】:

    您无需创建序列即可执行此操作。您已经是一个好公民并明确使用调度程序,所以就使用它!

    您可以将其用于您的代码

    AddItem(string item)
    {
      _mySet.Add(item);
      //Note this does return an IDisposable if you want to cancel the subscription.
      _scheduler.Schedule(
        TimeSpan.FromSeconds(60),
        ()=>
        { 
            RemoveItem(item);
            Console.WriteLine("Removed {0}", item);
        });
    }
    

    这基本上意味着幕后工作要少得多。考虑一下 Observable.Timer 方法正在进行的所有工作,实际上您只希望它执行一个带有值(您忽略)的 OnNext。

    我还假设即使对 Rx 一无所知的用户也能够阅读此计划代码。 IE。 “添加此项目后,我安排此删除操作在 60 秒内运行)。

    【讨论】:

    • 大声笑。我只是指出这与问题中的代码达到了相同的结果。问题不是问他如何切换到新库并“更实用”。正如 Flack 在他的问题中指出的那样,原始代码有效并通过了单元测试。我只是建议他不需要使用序列。
    【解决方案3】:

    如果你使用的是 ReactiveUI,一个名为 ReactiveCollection 的类肯定会在这里有所帮助,你可以这样使用它:

    theCollection.ItemsAdded
        .SelectMany(x => Observable.Timer(TimeSpan.FromSeconds(60), _scheduler).Select(_ => x))
        .Subscribe(x => theCollection.Remove(x));
    

    【讨论】:

      【解决方案4】:

      对不起,不是要挑剔你,而是:

      总是丢弃 IDISPOSABLES!!!!!

      (编辑:好的,不知道今天早上我在咖啡里放了什么,但我回答了一大堆废话;我会留下上面的只是因为一般,你确实想确保处理任何IDisposable,但为了弥补接下来的喋喋不休......)

      Subscribe 的调用会创建一个您不会处理的订阅,因此对该方法的多次调用只会排队越来越多的垃圾 - 现在在这种特定情况下,自从Timer 以来,这并不是世界末日只触发一次,但仍然...丢弃!

      如果你真的想使用这种方法(我认为更好的方法是让一些正在运行的线程/任务“倾向于”你的值,在它认为有必要时删除),至少尝试类似于:

      好吧,忽略所有那些被淘汰的废话。 Observable.Timer的实现是这样的:

      public static IObservable<long> Timer(TimeSpan dueTime)
      {
          return s_impl.Timer(dueTime);
      }
      

      这反过来又调用了这个:

      public virtual IObservable<long> Timer(TimeSpan dueTime)
      {
          return Timer_(dueTime, SchedulerDefaults.TimeBasedOperations);
      }
      

      调用...

      private static IObservable<long> Timer_(TimeSpan dueTime, IScheduler scheduler)
      {
          return new Timer(dueTime, null, scheduler);
      }
      

      这就是事情变得有趣的地方 - TimerProducer&lt;long&gt;,其中的肉是:

      private IDisposable InvokeStart(IScheduler self, object state)
      {
          this._pendingTickCount = 1;
          SingleAssignmentDisposable disposable = new SingleAssignmentDisposable();
          this._periodic = disposable;
          disposable.Disposable = self.SchedulePeriodic<long>(1L, this._period, new Func<long, long>(this.Tock));
          try
          {
              base._observer.OnNext(0L);
          }
          catch (Exception exception)
          {
              disposable.Dispose();
              exception.Throw();
          }
          if (Interlocked.Decrement(ref this._pendingTickCount) > 0)
          {
              SingleAssignmentDisposable disposable2 = new SingleAssignmentDisposable {
                  Disposable = self.Schedule<long>(1L, new Action<long, Action<long>>(this.CatchUp))
              };
              return new CompositeDisposable(2) { disposable, disposable2 };
          }
          return disposable;
      }
      

      现在,base._observer.OnNext,这是设置为在计时器滴答时触发的内部接收器,其中Invoke 是:

      private void Invoke()
      {
          base._observer.OnNext(0L);
          base._observer.OnCompleted();
          base.Dispose();
      }
      

      所以是的。它会自动处理自己 - 不会有任何“挥之不去的订阅”浮动。

      嗯....乌鸦很好吃。 :|

      【讨论】:

      • 嘿JerKimball,我以为Take(1) 满足后,源自动处理掉了。如果我添加一个 OnCompleted 处理程序,我会看到它被调用。调用 OnCompleted 时,源是否被释放?
      • @JerKimbal,这是不正确的建议,请删除。 IDisposable 在 Rx 的上下文中需要处理。 Disposing 在 Rx 中具有特殊含义,用于提前断开订阅。没有内存泄漏,示例代码没有问题。
      • 对不起@JerKimball,你通常对 Rx 很好,但 Paul 和 Flack 是正确的 - 一旦序列完成,订阅会自动“处理”(取消订阅)。
      • 天哪,我很抱歉;我不知道我到底在想什么......你当然是对的 - 让我看看我是否可以在这里通过一些编辑恢复一些面子......*facepalm*
      • @AlexG 我很惭愧。我责怪严重的头痛和最近的调试深入地狱。 ;)
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2011-02-10
      • 2011-08-27
      • 2012-01-15
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多