感谢您提出一个非常有趣的问题。我对此进行了尝试 - 开始安排未来的行动 - 虽然我设法达到了预期的输出,但我的解决方案存在重大问题。
你的更干净,但是……嗯……错了。好吧,稍微;0)
我首先使用 Microsoft 的 TestScheduler 编写了以下测试夹具:
[Fact]
public void MatchExpected()
{
TestScheduler scheduler = new TestScheduler();
// 0 1 2 3 4
// 1234567890123456789012345678901234567890
// a-------b-cd-----e---------f-----ghX <- Input
IObservable<char> input = scheduler.CreateColdObservable(
ReactiveTest.OnNext(1, 'a'),
ReactiveTest.OnNext(9, 'b'),
ReactiveTest.OnNext(11, 'c'),
ReactiveTest.OnNext(12, 'd'),
ReactiveTest.OnNext(18, 'e'),
ReactiveTest.OnNext(28, 'f'),
ReactiveTest.OnNext(34, 'g'),
ReactiveTest.OnNext(35, 'h'),
ReactiveTest.OnCompleted<char>(36)
);
// 0 1 2 3 4
// 1234567890123456789012345678901234567890
// a-------b-cd-----e---------f-----ghX <- Input
// a-------b---c---d---e------f-----g---hX <- Expected
var expected = new []
{
ReactiveTest.OnNext(ReactiveTest.Subscribed + 1, 'a'),
ReactiveTest.OnNext(ReactiveTest.Subscribed + 9, 'b'),
ReactiveTest.OnNext(ReactiveTest.Subscribed + 13, 'c'),
ReactiveTest.OnNext(ReactiveTest.Subscribed + 17, 'd'),
ReactiveTest.OnNext(ReactiveTest.Subscribed + 21, 'e'),
ReactiveTest.OnNext(ReactiveTest.Subscribed + 28, 'f'),
ReactiveTest.OnNext(ReactiveTest.Subscribed + 34, 'g'),
ReactiveTest.OnNext(ReactiveTest.Subscribed + 38, 'h'),
ReactiveTest.OnCompleted<char>(ReactiveTest.Subscribed + 38)
};
var actual = scheduler.Start(() => input.Separate(TimeSpan.FromTicks(4), scheduler), ReactiveTest.Subscribed + 40);
Assert.Equal(expected, actual.Messages.ToArray());
}
在此您可以看到输入和预期输出的大理石图(使用您的原始破折号表示法)。不幸的是,在使用您的实现时,您会收到以下输出:
// 0 1 2 3 4
// 1234567890123456789012345678901234567890
// a-------b-cd-----e---------f-----ghX <- Input
// a-------b---c---d---e------f-----g---hX <- Expected
// -a-------b--c---d---e-------f-----g--hX <- Actual
你看,使用 observable 来结束延迟的 Delay 重载需要调度器上的时间,然后 observable 才能发出值。不幸的是,在应该立即发出该值的情况下 (x.delay == TimeSpan.Zero),由于调度程序的循环,它实际上是稍后发出的。
由于我有测试夹具并且您有可行的解决方案,我想我会发回一个更正的版本,如下所示:
public static IObservable<T> Separate<T>(this IObservable<T> source, TimeSpan separation, IScheduler scheduler)
{
return Observable.Create<T>(
observer =>
{
var timedSource = source
.Timestamp(scheduler)
.Scan(
new
{
value = default(T),
time = DateTimeOffset.MinValue,
delay = TimeSpan.Zero
},
(acc, item) =>
{
var time =
item.Timestamp - acc.time >= separation
? item.Timestamp
: acc.time.Add(separation);
return new
{
value = item.Value,
time,
delay = time - item.Timestamp
};
})
.Publish();
var combinedSource = Observable.Merge(
timedSource.Where(x => x.delay == TimeSpan.Zero),
timedSource.Where(x => x.delay > TimeSpan.Zero).Delay(x => Observable.Timer(x.delay, scheduler))
);
return new CompositeDisposable(
combinedSource.Select(x => x.value).Subscribe(observer),
timedSource.Connect()
);
}
);
}
提供预期输出:
// 0 1 2 3 4
// 1234567890123456789012345678901234567890
// a-------b-cd-----e---------f-----ghX <- Input
// a-------b---c---d---e------f-----g---hX <- Expected
// a-------b---c---d---e------f-----g---hX <- Actual
注意添加了IScheduler 参数,它在操作员代码中使用。当在 Rx 中实现任何可能引入并发的操作符(就像这个一样)时,这是一个很好的做法,它允许您编写(非常严格的)测试!
所以你去。希望对您有所帮助:0)