【问题标题】:How to have events separated at least by a given time span?如何将事件至少分开给定的时间跨度?
【发布时间】:2016-10-26 10:24:57
【问题描述】:

我希望输出序列的事件尽快发生,但不是在从最新事件开始的 N 秒窗口内。

这是一个大理石图,假设我想在事件之间至少分隔三个破折号:

Input:  a-------b-cd-----e---------f-----g-h
Result: a-------b---c---d---e------f-----g---h

签名是:

IObservable<T> Separate<T>(this IObservable<T> source, TimeSpan separation);

【问题讨论】:

    标签: c# system.reactive


    【解决方案1】:

    严重依赖@ibebbs 的答案,我已经使用测试来查看是否有更简单的方法。

    就在我编写代码之前,我看到了一些已编码到测试中的假设。但是,我不知道是否需要这些断言。特别是 OnCompleted 时间。 @ibebbs 断言 OnCompleted 应该与最后一个值出现在同一帧中。 OP没有提出这样的要求。

    如果这不是要求,那么您可以采取完全不同的方法。

    当我看到你的大理石图时,我从 Input 到 Result 的心理翻译如下

    Input:  a-------b-cd-----e---------f-----g-h
            a---|
                    b--|
                        c--|
                            d--|
                                e--|
    Result: a-------b---c---d---e------f-----g---h
    

    即每个值都被投影到一个新的具有长尾的单值序列。也就是说,直到给定的缓冲时间它才会完成。这使得代码就像从单个值到具有单个值和延迟完成的序列的投影一样简单。然后你只需将所有这些迷你序列连接在一起

    public static IObservable<T> Separate<T>(this IObservable<T> source, TimeSpan separation, IScheduler scheduler)
    {
        var delayedEmpty = Observable.Empty<T>().Delay(separation, scheduler);
        return source.Select(s=>
                Observable.Return(s).Concat(delayedEmpty)
            ).Concat();
    }
    

    这将解决 OP,但是您还将在完成为每个值获得的序列时获得相同的缓冲区。

    【讨论】:

    • 谢谢,又学到了一件事:你可以通过连接空的延迟来构建延迟,这非常有趣!我将此标记为更简单和适合我的账单的答案。如果你能应付最后的延迟直到完成,这是最好的。否则请参阅@ibebbs 一。
    • 伟大的解决方案李。这实际上与我走的路非常相似,但我的实施直接依赖于时间表,而不是使用现有的操作员,并且留下了很多一次性用品。谢谢指点。
    • NP。在使用测试来验证您的 Rx 解决方案方面做得很好。下一步是依靠大理石图来直观地构建解决方案。
    【解决方案2】:

    感谢您提出一个非常有趣的问题。我对此进行了尝试 - 开始安排未来的行动 - 虽然我设法达到了预期的输出,但我的解决方案存在重大问题。

    你的更干净,但是……嗯……错了。好吧,稍微;0)

    我首先使用 Microsoft 的 TestScheduler 编写了以下测试夹具:

    [Fact]
    public void MatchExpected()
    {
        TestScheduler scheduler = new TestScheduler();
    
        // 0        1         2         3         4 
        // 1234567890123456789012345678901234567890
        // a-------b-cd-----e---------f-----ghX     <- Input
        IObservable<char> input = scheduler.CreateColdObservable(
            ReactiveTest.OnNext(1, 'a'),
            ReactiveTest.OnNext(9, 'b'),
            ReactiveTest.OnNext(11, 'c'),
            ReactiveTest.OnNext(12, 'd'),
            ReactiveTest.OnNext(18, 'e'),
            ReactiveTest.OnNext(28, 'f'),
            ReactiveTest.OnNext(34, 'g'),
            ReactiveTest.OnNext(35, 'h'),
            ReactiveTest.OnCompleted<char>(36)
        );
    
        // 0        1         2         3         4 
        // 1234567890123456789012345678901234567890
        // a-------b-cd-----e---------f-----ghX     <- Input
        // a-------b---c---d---e------f-----g---hX  <- Expected
        var expected = new []
        {
            ReactiveTest.OnNext(ReactiveTest.Subscribed + 1, 'a'),
            ReactiveTest.OnNext(ReactiveTest.Subscribed + 9, 'b'),
            ReactiveTest.OnNext(ReactiveTest.Subscribed + 13, 'c'),
            ReactiveTest.OnNext(ReactiveTest.Subscribed + 17, 'd'),
            ReactiveTest.OnNext(ReactiveTest.Subscribed + 21, 'e'),
            ReactiveTest.OnNext(ReactiveTest.Subscribed + 28, 'f'),
            ReactiveTest.OnNext(ReactiveTest.Subscribed + 34, 'g'),
            ReactiveTest.OnNext(ReactiveTest.Subscribed + 38, 'h'),
            ReactiveTest.OnCompleted<char>(ReactiveTest.Subscribed + 38)
        };            
    
        var actual = scheduler.Start(() => input.Separate(TimeSpan.FromTicks(4), scheduler), ReactiveTest.Subscribed + 40);
    
        Assert.Equal(expected, actual.Messages.ToArray());
    }
    

    在此您可以看到输入和预期输出的大理石图(使用您的原始破折号表示法)。不幸的是,在使用您的实现时,您会收到以下输出:

    // 0        1         2         3         4 
    // 1234567890123456789012345678901234567890
    // a-------b-cd-----e---------f-----ghX     <- Input
    // a-------b---c---d---e------f-----g---hX  <- Expected
    // -a-------b--c---d---e-------f-----g--hX  <- Actual
    

    你看,使用 observable 来结束延迟的 Delay 重载需要调度器上的时间,然后 observable 才能发出值。不幸的是,在应该立即发出该值的情况下 (x.delay == TimeSpan.Zero),由于调度程序的循环,它实际上是稍后发出的。

    由于我有测试夹具并且您有可行的解决方案,我想我会发回一个更正的版本,如下所示:

    public static IObservable<T> Separate<T>(this IObservable<T> source, TimeSpan separation, IScheduler scheduler)
    {
        return Observable.Create<T>(
            observer =>
            {
                var timedSource = source
                    .Timestamp(scheduler)
                    .Scan(
                        new
                        {
                            value = default(T),
                            time = DateTimeOffset.MinValue,
                            delay = TimeSpan.Zero
                        },
                        (acc, item) =>
                        {
                            var time =
                                item.Timestamp - acc.time >= separation
                                ? item.Timestamp
                                : acc.time.Add(separation);
                            return new
                            {
                                value = item.Value,
                                time,
                                delay = time - item.Timestamp
                            };
                        })
                    .Publish();
    
                var combinedSource = Observable.Merge(
                    timedSource.Where(x => x.delay == TimeSpan.Zero),
                    timedSource.Where(x => x.delay > TimeSpan.Zero).Delay(x => Observable.Timer(x.delay, scheduler))
                );
    
                return new CompositeDisposable(
                    combinedSource.Select(x => x.value).Subscribe(observer),
                    timedSource.Connect()
                );
            }
        );
    }
    

    提供预期输出:

    // 0        1         2         3         4 
    // 1234567890123456789012345678901234567890
    // a-------b-cd-----e---------f-----ghX     <- Input
    // a-------b---c---d---e------f-----g---hX  <- Expected
    // a-------b---c---d---e------f-----g---hX  <- Actual
    

    注意添加了IScheduler 参数,它在操作员代码中使用。当在 Rx 中实现任何可能引入并发的操作符(就像这个一样)时,这是一个很好的做法,它允许您编写(非常严格的)测试!

    所以你去。希望对您有所帮助:0)

    【讨论】:

    • 感谢您提供更精确的版本!我不担心绝对精度,我唯一关心的是确保结果事件至少按分离时间间隔分开。
    【解决方案3】:

    简要说明:

    • Timestamp():为每个事件添加时间戳。
    • Scan():此函数聚合类似于Aggregate(),但生成部分聚合值的序列,而不仅仅是最后一项。它用于确定每个事件的所需时间戳,考虑到最后一个所需的时间戳,因此与原始时间戳的延迟。
    • Delay():它自己执行延迟(感谢https://twitter.com/AzazelN28,不知道这个过载!)
    • Select():再次获取原始值。
        public static IObservable<T> Separate<T>(this IObservable<T> source, TimeSpan separation)
        {
            return source
                .Timestamp()
                .Scan(
                    new {
                        value = default(T),
                        time = DateTimeOffset.MinValue,
                        delay = TimeSpan.Zero },
                    (acc, item) =>
                    {
                        var time = 
                            item.Timestamp - acc.time >= separation
                            ? item.Timestamp
                            : acc.time.Add(separation);
                        return new
                        {
                            value = item.Value,
                            time,
                            delay = time - item.Timestamp
                        };
                    })
                .Delay(x => Observable.Timer(x.delay))
                .Select(x => x.value);
        }
    

    【讨论】:

      猜你喜欢
      • 2010-12-06
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2014-06-27
      相关资源
      最近更新 更多