【发布时间】:2010-11-02 22:35:27
【问题描述】:
我正在使用 Rx 中的 BufferWithTime() 来批量处理消息。如果我的 OnNext 方法的完成时间比使用的时间间隔长,我是否可以同时调用两个 OnNext 方法?
换句话说,在 BufferWithTime() 调用中指定的时间间隔是否会以绝对值计算,还是会在我的 OnNext 方法返回后充当“异步睡眠”?
【问题讨论】:
标签: .net concurrency system.reactive
我正在使用 Rx 中的 BufferWithTime() 来批量处理消息。如果我的 OnNext 方法的完成时间比使用的时间间隔长,我是否可以同时调用两个 OnNext 方法?
换句话说,在 BufferWithTime() 调用中指定的时间间隔是否会以绝对值计算,还是会在我的 OnNext 方法返回后充当“异步睡眠”?
【问题讨论】:
标签: .net concurrency system.reactive
不,对OnNext 的呼叫将排队。下面的代码在缓冲时间过去时写入跟踪,以及对订阅者的 OnNext 调用的开始/结束(其中它休眠了缓冲时间的 4 倍):
Observable.Interval(TimeSpan.FromMilliseconds(50), Scheduler.TaskPool)
.Take(50)
.BufferWithTime(TimeSpan.FromMilliseconds(100))
.Do(_ => Trace.WriteLine("Buffer elapsed"))
.ObserveOn(Scheduler.TaskPool)
.Subscribe(_ =>
{
Trace.WriteLine("Begin OnNext");
Thread.Sleep(200);
Trace.WriteLine("End OnNext");
});
输出如下。您可以看到 Begin/End OnNext 永远不会重叠,即使 Buffer elapsed 在 OnNext 调用期间出现两次:
缓冲区已用
开始下一步
缓冲区已用
缓冲区已用
结束 OnNext
开始下一步
缓冲区已用
缓冲区已用
缓冲区已用
结束 OnNext
开始下一步
结束 OnNext
开始下一步
结束 OnNext
开始下一步
结束 OnNext
开始下一步
结束 OnNext
【讨论】: