【问题标题】:How to debounce until value changed (with timeout)?如何去抖动直到值改变(超时)?
【发布时间】:2019-10-22 14:59:35
【问题描述】:

我需要一个 RX 操作来消除流中的元素直到值发生变化。如果值在一段时间内没有变化,它还必须支持发出最后一个元素的超时。

t 标记超时

DistinctUntilChanged 有点相似,但我想要最后一个相同的项目,而不是第一个。我尝试使用 BufferGroupBy 并选择组中的最后一个,但我需要在每个元素上重置计时器,以确保在选择最后一个之前该组包含所有相等的元素。

我做了一个使用 TimeoutRetry 的实现,但我对每次发生超时都必须重新订阅源不太满意,因为这可能不适合所有场景/源(即冷可观察)。不过,似乎可以与我测试过的热门 observables 一起工作。

public static IObservable<TSource> ThrottleBy<TSource, TKey>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, TimeSpan timeout, IEqualityComparer<TKey> comparer = null, IScheduler scheduler = null)
{
    comparer ??= EqualityComparer<TKey>.Default;
    scheduler ??= DefaultScheduler.Instance;

    var prev = default(TSource);
    return source
        .StartWith(default(TSource))
        .Select(e =>
        {
            var ret = !comparer.Equals(keySelector(prev), keySelector(e)) ? prev : default;
            prev = e;
            return ret;
        })
        .Where(e => !Equals(e, default(TSource)))
        .Timeout(timeout, scheduler)
        .RetryWhen(ex => ex.OfType<TimeoutException>());
}

由于 Throttle 在 Rx.NET 中的工作方式,决定将其称为 ThrottleBy 而不是 DebounceBy。

关于如何/应该如何实施这样的操作有什么想法吗?

【问题讨论】:

    标签: c# system.reactive


    【解决方案1】:

    编辑:感谢大理石图。我把它变成了一些测试用例。你是对的,我之前的解决方案是缺少计时器。我在这里补充说,在他将每条消息加倍的形式中,一个是立即发送,另一个是延迟发送。这是解决方案:

    public static IObservable<TSource> ThrottleBy4<TSource, TKey>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, TimeSpan timeout,
        IEqualityComparer<TKey> comparer = null, IScheduler scheduler = null)
    {
        comparer = comparer ?? EqualityComparer<TKey>.Default;
        scheduler = scheduler ?? DefaultScheduler.Instance;
    
        return source
            .Timestamp(scheduler)
            .Publish(_val => Observable.Merge(  // For every incoming item, create two items: One immediate, one delayed by the timeout time.
                _val.Select(v => (value: v, isOriginal: true)),
                _val.Select(v => (value: v, isOriginal: false)).Delay(timeout, scheduler)
            ))
            .StateSelect(Timestamped.Create(default(TSource), DateTimeOffset.MinValue),
                (prevVal, t) =>         // Result function
                {
                    // special handling for the initial state
                    if (prevVal.Timestamp == DateTimeOffset.MinValue)
                        return (prevVal, false);
    
                    if (t.isOriginal)   // If an original value, only emit if the value changed.
                        return (prevVal, !comparer.Equals(keySelector(t.value.Value), keySelector(prevVal.Value)));
                    else                // If a repeat value, only emit if the prevVal state is the same timestamp and value.
                        return (prevVal, comparer.Equals(keySelector(t.value.Value), keySelector(prevVal.Value)) && t.value.Timestamp == prevVal.Timestamp);
                },
                (prevVal, t) => t.isOriginal ? t.value : prevVal        // State function. Only change state if the incoming item is an original value.
            )
            .Where(t => t.Item2)
            .Select(t => t.Item1.Value);
    }
    

    这是测试代码:

    TestScheduler ts = new TestScheduler();
    var source = ts.CreateHotObservable<string>(
        new Recorded<Notification<string>>(200.MsTicks(), Notification.CreateOnNext("A1")),
        new Recorded<Notification<string>>(300.MsTicks(), Notification.CreateOnNext("A2")),
        new Recorded<Notification<string>>(500.MsTicks(), Notification.CreateOnNext("B1")),
        new Recorded<Notification<string>>(800.MsTicks(), Notification.CreateOnNext("B2"))
    );
    
    var comparer = new FirstLetterComparer();
    var target = source
        .ThrottleBy4(s => s, TimeSpan.FromSeconds(1), comparer: comparer, scheduler: ts);
    
    var expectedResults = ts.CreateHotObservable<string>(
        new Recorded<Notification<string>>(500.MsTicks(), Notification.CreateOnNext("A2")),
        new Recorded<Notification<string>>(1800.MsTicks(), Notification.CreateOnNext("B2"))
    );
    
    var observer = ts.CreateObserver<string>();
    target.Subscribe(observer);
    ts.Start();
    
    ReactiveAssert.AreElementsEqual(expectedResults.Messages, observer.Messages);
    

    还有这些帮助类:

    public class FirstLetterComparer : IEqualityComparer<string>
    {
        public bool Equals(string s1, string s2)
        {
            if (s1 == null && s2 == null) 
                return true;
            if (s1 == null || s2 == null)
                return false;
            return (s1[0] == s2[0]);
        }
    
        public int GetHashCode(string s)
        {
            return s == null ? 0 : s[0].GetHashCode();
        }
    }
    
    public static class X
    {
        public static long MsTicks(this int i)
        {
            return TimeSpan.FromMilliseconds(i).Ticks;
        }
    }
    

    上一个答案:

    我发现您的解决方案可能存在两个问题(Timeout 问题除外):

    1. 使用default(T) 作为令牌值会在某些时候绊倒你。例如,这将不允许0 通过IObservable&lt;int&gt;
    2. 由于您使用了prev 字段,您可能会遇到多订阅问题。多个订阅者将共享该字段,这可能导致竞争条件和错误行为。

    您可以通过返回一个元组来解决这两个问题,一个带有是否为新的布尔值的值,一个带有该值的值:

        public static IObservable<TSource> ThrottleBy2<TSource, TKey>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, TimeSpan timeout,
        IEqualityComparer<TKey> comparer = null, IScheduler scheduler = null)
    {
        comparer = comparer ?? EqualityComparer<TKey>.Default;
        scheduler = scheduler ?? DefaultScheduler.Instance;
    
        return source
            .StateSelect(default(TSource), (prevVal, newVal) => (!comparer.Equals(keySelector(prevVal), keySelector(newVal)), newVal), (_, newVal) => newVal)
            .Where(t => t.Item1)
            .Select(t => t.newVal)
            .Timeout(timeout, scheduler)
            .RetryWhen(ex => ex.OfType<TimeoutException>());
    }
    

    StateSelect 在这里做你想做的事:它维护一个状态(你之前在prev 字段中拥有的状态),并返回前面提到的元组。它看起来像这样:

    public static IObservable<TResult> StateSelect<TSource, TState, TResult>(this IObservable<TSource> source, TState initialState,
        Func<TState, TSource, TResult> resultSelector, Func<TState, TSource, TState> stateSelector)
    {
        return source
            .StateSelectMany(initialState, (state, item) => Observable.Return(resultSelector(state, item)), stateSelector);
    }
    
    public static IObservable<TResult> StateSelectMany<TSource, TState, TResult>(this IObservable<TSource> source, TState initialState, 
        Func<TState, TSource, IObservable<TResult>> resultSelector, Func<TState, TSource, TState> stateSelector)
    {
        return source
            .Scan(Tuple.Create(initialState, Observable.Empty<TResult>()), (state, item) => Tuple.Create(stateSelector(state.Item1, item), resultSelector(state.Item1, item)))
            .SelectMany(t => t.Item2);
    }
    

    这仍然留下两个小问题:

    1. Timeout 问题
    2. 如果第一个真值为default(TSource),则使用default(TSource) 作为初始状态可能会导致问题。

    我们可以通过引入时间戳来解决这两个问题:

    public static IObservable<TSource> ThrottleBy3<TSource, TKey>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, TimeSpan timeout,
        IEqualityComparer<TKey> comparer = null, IScheduler scheduler = null)
    {
        comparer = comparer ?? EqualityComparer<TKey>.Default;
        scheduler = scheduler ?? DefaultScheduler.Instance;
    
        return source
            .Timestamp(scheduler)
            .StateSelect(Timestamped.Create(default(TSource), DateTimeOffset.MinValue), 
                (prevVal, newVal) => (!comparer.Equals(keySelector(prevVal.Value), keySelector(newVal.Value)) || newVal.Timestamp - prevVal.Timestamp > timeout, newVal), 
                (prevVal, newVal) => !comparer.Equals(keySelector(prevVal.Value), keySelector(newVal.Value)) || newVal.Timestamp - prevVal.Timestamp > timeout ? newVal : prevVal
            )
            .Where(t => t.Item1)
            .Select(t => t.newVal.Value);
    }
    

    这里我们将时间戳值存储为状态,如果去抖动时间足够或值发生变化,我们会更改状态。结果又是一个元组,指示该值是否应该继续,以及带时间戳的值。

    希望这会有所帮助。

    【讨论】:

    • 非常感谢您的输入,您有一些有效的观点!尽管没有计时器,但我看不出您的第二种选择如何工作。它需要在超时发生后发出最后看到的元素。我肯定会采纳你的改进,但我真的希望摆脱超时/重试部分,所以它还不是 100%...
    • 经过一些测试,我发现您的替代实现都没有与我的代码一样工作。我将添加一个弹珠图来可视化所需的行为。
    • 更新答案。
    • 好,我想现在已经很接近了!只是在超时后下一个元素到达时,重复超时的元素。在您的测试中,尝试在时间 2000(在“B2”超时之后)添加一个“C1”元素。在这种情况下,“B2”在超时时发出,然后在“C1”到达时再次发出。有没有办法存储超时发出的状态?
    • 查看我添加的答案。它非常基于您的想法,但已简化并针对上述问题进行了处理。
    【解决方案2】:

    感谢 Shlomo 提供的非常好的帮助,我想我现在对这个问题有了一个很好的解决方案:

    public static IObservable<TSource> ThrottleBy<TSource, TKey>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, TimeSpan timeout,
    IEqualityComparer<TKey> comparer = null, IScheduler scheduler = null)
    {
        comparer = comparer ?? EqualityComparer<TKey>.Default;
        scheduler = scheduler ?? DefaultScheduler.Instance;
    
        return source
            .Publish(_val => Observable.Merge(
                _val.Select(v => (value: v, timeout: false)),
                _val.Select(v => (value: v, timeout: true)).Throttle(timeout, scheduler)
            ))
            .Scan((prev: (value: (object)null, timeout: false), emit: (object)null), (state, t) => {
                if (state.prev.value == null) // Initial state
                    return (t, null); // Save new state and ignore
    
                // Emit previous in case of timeout or value changed
                if (t.timeout || (!state.prev.timeout && !comparer.Equals(keySelector(t.value), keySelector((TSource)state.prev.value))))
                    return (t, state.prev.value);
    
                // Save new state and ignore
                return (t, null);
            })
            .Where(x => x.emit != null)
            .Select(x => (TSource)x.emit);
    }
    

    这很像 Shlomo 的建议,但我最终使用了 Throttle 而不是 Delay,这使它更简单。合并到辅助方法中以使其自包含。我将值装箱以避免default(TSource)

    【讨论】:

      猜你喜欢
      • 2022-01-13
      • 1970-01-01
      • 2021-11-21
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多