【问题标题】:RX buffer events for several seconds after first event is triggered触发第一个事件后几秒钟的 RX 缓冲区事件
【发布时间】:2016-08-26 14:34:16
【问题描述】:

我有一个文件观察器,我可以从中观察创建和更改的事件。 我希望当第一个事件被触发(创建或更改)时,它需要开始缓冲 10 秒,在这 10 秒之后我想处理缓冲的事件。

我已经得到的是这个:

Observable.FromEventPattern<FileSystemEventArgs>(FileSystemWatcher, "Created")
                .Merge(Observable.FromEventPattern<FileSystemEventArgs>(FileSystemWatcher, "Changed"))
                .Buffer(TimeSpan.FromSeconds(10))
                .Subscribe(list =>
                {
                   Debug.WriteLine("Do something");
                });

此代码执行 'Debug.WriteLine("Do something");'每 10 秒一次。

编辑: 好吧,让我试着用时间线来解释一下。

  1. 文件观察器处于空闲状态,未触发任何事件。
  2. 在一段未知的时间后,文件被放置在目录中
  3. 创建的事件被触发
  4. 可观察列表开始缓冲(所有事件)10 秒
  5. 在这 10 秒后,订阅操作被执行,它会立即处理所有事件

希望这会让事情变得清晰

【问题讨论】:

  • 您当前的代码有什么问题?为什么不消费list?还是要单独处理列表中的每个项目?
  • 你只希望它触发一次还是什么?听起来您的解决方案很合适。
  • 附注:您应该保留对订阅的引用,并在您不再对事件感兴趣时将其丢弃。如果您不处置订阅并且它超出范围,那么它可能仍会保留对事件的强引用(并可能导致泄漏)。
  • 相应地编辑了答案。

标签: c# system.reactive


【解决方案1】:

我假设您想要以下行为:

  1. 在初始事件之后,将所有事件缓冲接下来的 10 秒。
  2. 一旦该 10 秒窗口关闭,下一个窗口应在 10 秒后为所有事件触发一个新的 10 秒缓冲区。

假设我们有 5 个事件在 5 秒内均匀分布,间隔 13 秒,然后另外 5 个事件在 5 秒内均匀分布。大理石图看起来像这样:

timeline: 0--1--2--3--4--5--6--7--8--9-10-11-12-13-14-15-16-17-18-19-20-21-22-23-24-25-26-27
events  : x--x--x--x--x-------------------------------------x--x--x--x--x------------------
stdbuff : |----------------------------|-----------------------------|---------------------
desired : BeginCapture-----------------Return---------------BeginCapture------------------Return

直接使用 Buffer 的问题在于,它看起来像上面提到的 stdbuff,并将第二组事件分成两组,从而为第二组事件生成两个列表:一个有三个事件,一个有两个事件。您需要一个列表(用于第二组),使用类似desired 流的逻辑。从 0 开始捕获,从 10 返回列表。从 17 开始捕获,从 27 返回列表。

如果我(再次)误解了你,请发布一个类似于上面的大理石图,表示你希望事情如何工作。


假设我理解正确,下面的代码将起作用...

//var initialSource = Observable.FromEventPattern<FileSystemEventArgs>(fileWatcher, nameof(FileSystemWatcher.Created))
//  .Merge(Observable.FromEventPattern<FileSystemEventArgs>(fileWatcher, nameof(FileSystemWatcher.Changed)));

    //Comment this out, and use the above lines for your code. This just makes testing the Rx components much easier.
var initialSource = Observable.Interval(TimeSpan.FromSeconds(1)).Take(5)
    .Concat(Observable.Empty<long>().Delay(TimeSpan.FromSeconds(13)))
    .Concat(Observable.Interval(TimeSpan.FromSeconds(1)).Take(5));

initialSource
    .Publish( _source => _source 
        .Buffer(_source
            .Scan(DateTimeOffset.MinValue, (lastPrimary, _) => DateTimeOffset.Now - lastPrimary > TimeSpan.FromSeconds(10) ? DateTimeOffset.Now : lastPrimary)
            .DistinctUntilChanged()
            .Delay(TimeSpan.FromSeconds(10))
        )
    )
    .Subscribe(list =>
    {
        Debug.WriteLine($"Time-stamp: {DateTime.Now.ToLongTimeString()}");
        Debug.WriteLine($"List Count: {list.Count}");
    });

解释

首先,我们需要识别“主要事件”,即代表上述desired 流描述中的BeginCapture 注释的事件。可以这样找到:

 var primaryEvents = initialSource
        .Scan(DateTimeOffset.MinValue, (lastPrimary, _) => DateTimeOffset.Now - lastPrimary > TimeSpan.FromSeconds(10) ? DateTimeOffset.Now : lastPrimary)
        .DistinctUntilChanged();

一旦我们有了BeginCapture 事件,它可以表示窗口打开,就很容易找到Return 事件或窗口关闭:

 var closeEvents = primaryEvents.Delay(TimeSpan.FromSeconds(10));

在实践中,由于我们关心的关闭和打开之间没有发生任何事情,我们真的只需要担心关闭事件,因此我们可以将其缩小为:

 var closeEvents = initialSource
        .Scan(DateTimeOffset.MinValue, (lastPrimary, _) => DateTimeOffset.Now - lastPrimary > TimeSpan.FromSeconds(10) ? DateTimeOffset.Now : lastPrimary)
        .DistinctUntilChanged()
        .Delay(TimeSpan.FromSeconds(10));

将其插入BuffercloseEventsbufferBoundaries

var bufferredLists = initialSource
    .Buffer(initialsource
        .Scan(DateTimeOffset.MinValue, (lastPrimary, _) => DateTimeOffset.Now - lastPrimary > TimeSpan.FromSeconds(10) ? DateTimeOffset.Now : lastPrimary)
        .DistinctUntilChanged()
        .Delay(TimeSpan.FromSeconds(10))
    );

最后,由于我们有多个订阅 initialSource,我们将希望使用 Publish 来确保并发正常工作,从而得出最终答案。

【讨论】:

  • 请写下您期望它如何工作的时间表。
  • 这确实有效。我认为会有一个更简单的解决方案。有点难以阅读这里发生的事情,这并没有真正改善这个项目的维护。我不是你看到的唯一一个在做这件事的人。
  • 是否有特定的部分令人困惑?它是四行,即使它们有点密集。
  • 我必须承认,如果您不熟悉 PublishBufferScan 运算符及其重载,乍一看很难理解。 @Shlomo 你能进一步解释为什么这里需要Publish 运算符以及为什么不需要调用Connect() 方法吗?
  • @SimonCorcos,我经常收到这个问题,所以我把它写成问答:stackoverflow.com/questions/53747350/…
猜你喜欢
  • 2010-12-15
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多