【问题标题】:Delay on condition有条件延迟
【发布时间】:2016-12-03 12:32:06
【问题描述】:

有一个代码:

someObservable.Select(x => getY(x));    
Y getY(X x)
{
  if (x.Value == X.ABC)
    return new Y(1);
  else
    return new Y(2);
}

在某些情况下,我需要在一段时间后仔细检查 x.Value。 最简单但最糟糕的解决方案是使用 Thread.Sleep:

Y getY(X x)
{
  if (x.Value == X.ABC)
    return new Y(1);
  else
  if (x.SomethingElse == true)
  {
     Thread.Sleep(timeout);
     if (x.Value == X.ABC)
       return new Y(1);
     else
       return new Y(2);
  }
}

这里的正确代码是什么? 我需要以与接收相同的方式订购事件。 这意味着如果我有一个延迟并且我得到一个新值,它必须等待处理。

【问题讨论】:

    标签: .net system.reactive reactivex


    【解决方案1】:

    解决方案(来自https://rsdn.org/forum/dotnet/6629370.1)是在 getY 中返回 IObservable 而不是 Y,并使用 Concat 生成 Observable.Delay。

    IObservable<Y> getY(X x)
    {
      if (x.Value == X.ABC)
        return Observable.Return(new Y(1));
      else
      if (x.SomethingElse == true)
      {
         return Observable.Delay(Observable.Return(x), timeout).Select(xx => xx.Value == X.ABC ? new Y(1) : new Y(2));
      }
    }
    

    IObservable<Y> getY(X x)
    {
      return Observable.Create<Y>(async (obs, token) =>
      {
        if (x.Value == X.ABC)
            obs.OnNext(new Y(1));
        else
        if (x.SomethingElse == true)
        {
            await Task.Delay(timeout, token);
            if (x.Value == X.ABC)
               obs.OnNext(new Y(1));
            else
               obs.OnNext(new Y(2));
         }
       }
     }
    

    然后:someObservable.Select(x => getY(x)).Concat();

    【讨论】:

    • 这行得通。我正在考虑使用SelectMany,它相当于Select,后跟Merge,它不会保留顺序。但是,此解决方案将正确地保持顺序。
    • 你最终会为每个值创建一个新的 observable。
    • 是的,它还允许我在 Rx 方式中进行短暂的延迟而不是长时间的延迟:Observable.Timer(0, 100ms).Take(tries).Select(x => ..).Where (x != null).Take(1);
    【解决方案2】:

    如果这是一个普遍的问题,我建议你实现一个自定义操作符。 delay 方法有一个重载,它接受另一个 observable 来控制延迟时间。示例中偶数立即推送,奇数延迟;

    Observable.Interval(TimeSpan.FromSeconds(1))
              .Delay(i => (i % 2 == 0) ? Observable.Return(0L) : Observable.Timer(TimeSpan.FromSeconds(0.9)))
              .Subscribe(Console.WriteLine);
    

    编辑: 下面是一个更简单的、保留顺序的延迟运算符:

      static IObservable<T> DelayOrdered<T>(this IObservable<T> observable, Func<T, TimeSpan> delaySelector, IScheduler scheduler = default(IScheduler))
            {
                scheduler = scheduler ?? DefaultScheduler.Instance;
                return Observable.Create<T>(observer =>
                {
                    var now = scheduler.Now;
    
                    return observable
                           .Subscribe(value =>
                    {
                        now = now.Add(delaySelector(value));
                        scheduler.Schedule(now, () => observer.OnNext(value));
                    });
                });
            }
    

    用法:

      Observable.Interval(TimeSpan.FromSeconds(0.2))
                          .DelayOrdered(i => (i % 2 == 0) ? TimeSpan.Zero : TimeSpan.FromSeconds(1))
                          .Subscribe(Console.WriteLine);
    

    订单将被保留,因为绝对预定时间只能增加。

    【讨论】:

    • 谢谢。我需要保持秩序的唯一问题。我在下面得到了一些解决方案的建议。你怎么看?
    • @NN_ 是的,您提出的解决方案应该保持秩序。不过,我认为您可以提高效率。
    猜你喜欢
    • 1970-01-01
    • 2018-10-17
    • 2014-11-06
    • 1970-01-01
    • 2019-09-27
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多