编辑:感谢大理石图。我把它变成了一些测试用例。你是对的,我之前的解决方案是缺少计时器。我在这里补充说,在他将每条消息加倍的形式中,一个是立即发送,另一个是延迟发送。这是解决方案:
public static IObservable<TSource> ThrottleBy4<TSource, TKey>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, TimeSpan timeout,
IEqualityComparer<TKey> comparer = null, IScheduler scheduler = null)
{
comparer = comparer ?? EqualityComparer<TKey>.Default;
scheduler = scheduler ?? DefaultScheduler.Instance;
return source
.Timestamp(scheduler)
.Publish(_val => Observable.Merge( // For every incoming item, create two items: One immediate, one delayed by the timeout time.
_val.Select(v => (value: v, isOriginal: true)),
_val.Select(v => (value: v, isOriginal: false)).Delay(timeout, scheduler)
))
.StateSelect(Timestamped.Create(default(TSource), DateTimeOffset.MinValue),
(prevVal, t) => // Result function
{
// special handling for the initial state
if (prevVal.Timestamp == DateTimeOffset.MinValue)
return (prevVal, false);
if (t.isOriginal) // If an original value, only emit if the value changed.
return (prevVal, !comparer.Equals(keySelector(t.value.Value), keySelector(prevVal.Value)));
else // If a repeat value, only emit if the prevVal state is the same timestamp and value.
return (prevVal, comparer.Equals(keySelector(t.value.Value), keySelector(prevVal.Value)) && t.value.Timestamp == prevVal.Timestamp);
},
(prevVal, t) => t.isOriginal ? t.value : prevVal // State function. Only change state if the incoming item is an original value.
)
.Where(t => t.Item2)
.Select(t => t.Item1.Value);
}
这是测试代码:
TestScheduler ts = new TestScheduler();
var source = ts.CreateHotObservable<string>(
new Recorded<Notification<string>>(200.MsTicks(), Notification.CreateOnNext("A1")),
new Recorded<Notification<string>>(300.MsTicks(), Notification.CreateOnNext("A2")),
new Recorded<Notification<string>>(500.MsTicks(), Notification.CreateOnNext("B1")),
new Recorded<Notification<string>>(800.MsTicks(), Notification.CreateOnNext("B2"))
);
var comparer = new FirstLetterComparer();
var target = source
.ThrottleBy4(s => s, TimeSpan.FromSeconds(1), comparer: comparer, scheduler: ts);
var expectedResults = ts.CreateHotObservable<string>(
new Recorded<Notification<string>>(500.MsTicks(), Notification.CreateOnNext("A2")),
new Recorded<Notification<string>>(1800.MsTicks(), Notification.CreateOnNext("B2"))
);
var observer = ts.CreateObserver<string>();
target.Subscribe(observer);
ts.Start();
ReactiveAssert.AreElementsEqual(expectedResults.Messages, observer.Messages);
还有这些帮助类:
public class FirstLetterComparer : IEqualityComparer<string>
{
public bool Equals(string s1, string s2)
{
if (s1 == null && s2 == null)
return true;
if (s1 == null || s2 == null)
return false;
return (s1[0] == s2[0]);
}
public int GetHashCode(string s)
{
return s == null ? 0 : s[0].GetHashCode();
}
}
public static class X
{
public static long MsTicks(this int i)
{
return TimeSpan.FromMilliseconds(i).Ticks;
}
}
上一个答案:
我发现您的解决方案可能存在两个问题(Timeout 问题除外):
- 使用
default(T) 作为令牌值会在某些时候绊倒你。例如,这将不允许0 通过IObservable<int>。
- 由于您使用了
prev 字段,您可能会遇到多订阅问题。多个订阅者将共享该字段,这可能导致竞争条件和错误行为。
您可以通过返回一个元组来解决这两个问题,一个带有是否为新的布尔值的值,一个带有该值的值:
public static IObservable<TSource> ThrottleBy2<TSource, TKey>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, TimeSpan timeout,
IEqualityComparer<TKey> comparer = null, IScheduler scheduler = null)
{
comparer = comparer ?? EqualityComparer<TKey>.Default;
scheduler = scheduler ?? DefaultScheduler.Instance;
return source
.StateSelect(default(TSource), (prevVal, newVal) => (!comparer.Equals(keySelector(prevVal), keySelector(newVal)), newVal), (_, newVal) => newVal)
.Where(t => t.Item1)
.Select(t => t.newVal)
.Timeout(timeout, scheduler)
.RetryWhen(ex => ex.OfType<TimeoutException>());
}
StateSelect 在这里做你想做的事:它维护一个状态(你之前在prev 字段中拥有的状态),并返回前面提到的元组。它看起来像这样:
public static IObservable<TResult> StateSelect<TSource, TState, TResult>(this IObservable<TSource> source, TState initialState,
Func<TState, TSource, TResult> resultSelector, Func<TState, TSource, TState> stateSelector)
{
return source
.StateSelectMany(initialState, (state, item) => Observable.Return(resultSelector(state, item)), stateSelector);
}
public static IObservable<TResult> StateSelectMany<TSource, TState, TResult>(this IObservable<TSource> source, TState initialState,
Func<TState, TSource, IObservable<TResult>> resultSelector, Func<TState, TSource, TState> stateSelector)
{
return source
.Scan(Tuple.Create(initialState, Observable.Empty<TResult>()), (state, item) => Tuple.Create(stateSelector(state.Item1, item), resultSelector(state.Item1, item)))
.SelectMany(t => t.Item2);
}
这仍然留下两个小问题:
-
Timeout 问题
- 如果第一个真值为
default(TSource),则使用default(TSource) 作为初始状态可能会导致问题。
我们可以通过引入时间戳来解决这两个问题:
public static IObservable<TSource> ThrottleBy3<TSource, TKey>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, TimeSpan timeout,
IEqualityComparer<TKey> comparer = null, IScheduler scheduler = null)
{
comparer = comparer ?? EqualityComparer<TKey>.Default;
scheduler = scheduler ?? DefaultScheduler.Instance;
return source
.Timestamp(scheduler)
.StateSelect(Timestamped.Create(default(TSource), DateTimeOffset.MinValue),
(prevVal, newVal) => (!comparer.Equals(keySelector(prevVal.Value), keySelector(newVal.Value)) || newVal.Timestamp - prevVal.Timestamp > timeout, newVal),
(prevVal, newVal) => !comparer.Equals(keySelector(prevVal.Value), keySelector(newVal.Value)) || newVal.Timestamp - prevVal.Timestamp > timeout ? newVal : prevVal
)
.Where(t => t.Item1)
.Select(t => t.newVal.Value);
}
这里我们将时间戳值存储为状态,如果去抖动时间足够或值发生变化,我们会更改状态。结果又是一个元组,指示该值是否应该继续,以及带时间戳的值。
希望这会有所帮助。