我假设您想要以下行为:
- 在初始事件之后,将所有事件缓冲接下来的 10 秒。
- 一旦该 10 秒窗口关闭,下一个窗口应在 10 秒后为所有事件触发一个新的 10 秒缓冲区。
假设我们有 5 个事件在 5 秒内均匀分布,间隔 13 秒,然后另外 5 个事件在 5 秒内均匀分布。大理石图看起来像这样:
timeline: 0--1--2--3--4--5--6--7--8--9-10-11-12-13-14-15-16-17-18-19-20-21-22-23-24-25-26-27
events : x--x--x--x--x-------------------------------------x--x--x--x--x------------------
stdbuff : |----------------------------|-----------------------------|---------------------
desired : BeginCapture-----------------Return---------------BeginCapture------------------Return
直接使用 Buffer 的问题在于,它看起来像上面提到的 stdbuff,并将第二组事件分成两组,从而为第二组事件生成两个列表:一个有三个事件,一个有两个事件。您需要一个列表(用于第二组),使用类似desired 流的逻辑。从 0 开始捕获,从 10 返回列表。从 17 开始捕获,从 27 返回列表。
如果我(再次)误解了你,请发布一个类似于上面的大理石图,表示你希望事情如何工作。
假设我理解正确,下面的代码将起作用...
//var initialSource = Observable.FromEventPattern<FileSystemEventArgs>(fileWatcher, nameof(FileSystemWatcher.Created))
// .Merge(Observable.FromEventPattern<FileSystemEventArgs>(fileWatcher, nameof(FileSystemWatcher.Changed)));
//Comment this out, and use the above lines for your code. This just makes testing the Rx components much easier.
var initialSource = Observable.Interval(TimeSpan.FromSeconds(1)).Take(5)
.Concat(Observable.Empty<long>().Delay(TimeSpan.FromSeconds(13)))
.Concat(Observable.Interval(TimeSpan.FromSeconds(1)).Take(5));
initialSource
.Publish( _source => _source
.Buffer(_source
.Scan(DateTimeOffset.MinValue, (lastPrimary, _) => DateTimeOffset.Now - lastPrimary > TimeSpan.FromSeconds(10) ? DateTimeOffset.Now : lastPrimary)
.DistinctUntilChanged()
.Delay(TimeSpan.FromSeconds(10))
)
)
.Subscribe(list =>
{
Debug.WriteLine($"Time-stamp: {DateTime.Now.ToLongTimeString()}");
Debug.WriteLine($"List Count: {list.Count}");
});
解释:
首先,我们需要识别“主要事件”,即代表上述desired 流描述中的BeginCapture 注释的事件。可以这样找到:
var primaryEvents = initialSource
.Scan(DateTimeOffset.MinValue, (lastPrimary, _) => DateTimeOffset.Now - lastPrimary > TimeSpan.FromSeconds(10) ? DateTimeOffset.Now : lastPrimary)
.DistinctUntilChanged();
一旦我们有了BeginCapture 事件,它可以表示窗口打开,就很容易找到Return 事件或窗口关闭:
var closeEvents = primaryEvents.Delay(TimeSpan.FromSeconds(10));
在实践中,由于我们关心的关闭和打开之间没有发生任何事情,我们真的只需要担心关闭事件,因此我们可以将其缩小为:
var closeEvents = initialSource
.Scan(DateTimeOffset.MinValue, (lastPrimary, _) => DateTimeOffset.Now - lastPrimary > TimeSpan.FromSeconds(10) ? DateTimeOffset.Now : lastPrimary)
.DistinctUntilChanged()
.Delay(TimeSpan.FromSeconds(10));
将其插入Buffer,closeEvents 是bufferBoundaries:
var bufferredLists = initialSource
.Buffer(initialsource
.Scan(DateTimeOffset.MinValue, (lastPrimary, _) => DateTimeOffset.Now - lastPrimary > TimeSpan.FromSeconds(10) ? DateTimeOffset.Now : lastPrimary)
.DistinctUntilChanged()
.Delay(TimeSpan.FromSeconds(10))
);
最后,由于我们有多个订阅 initialSource,我们将希望使用 Publish 来确保并发正常工作,从而得出最终答案。