【问题标题】:Reactive (RX) throttle without loss无损失反应式 (RX) 油门
【发布时间】:2016-03-14 12:36:26
【问题描述】:

在我开始写之前,我认为值得一问:RX 有一个油门扩展方法,如果事件发生得太快,它会丢弃事件。

因此,如果您要求它将事件限制为每 5 秒 1 个,如果您在 0.1 秒后收到一个事件,然后在 1 秒后收到第二个事件,您将收到一个事件,然后是静音。

我想要的是它在 0.1 秒后引发第一个事件,但在 4.9 秒后引发另一个事件。

此外,如果我在 0.1、1 和 2 秒时收到事件,我希望它在 0.1 秒、5 秒时引发事件,然后什么都没有,所以我不希望它捕获 n 个事件并且每个周期只释放一个n 个周期。

Buffer 则相反,它将所有内容保存 5 秒,然后引发事件,因此既不是油门也不是缓冲区,而是介于两者之间的东西。

有没有办法用现有的框架做到这一点,还是我需要写一个?

【问题讨论】:

  • 你能解释一下第二种情况吗? A(0.1s), B(1s), C(2s) => A(0.1s) 是这样吗?与第一种情况有什么区别?
  • 当然塔马斯。 A(0.1s),B(1s), C(2s) 将导致 A(0.1s), B(5s)。如果我们只是对它进行评级限制,您最终可能会得到 A(0.1s)、B(5s)、C(10s),这是我不想要的。所以你是正确的结果是一样的,但输入不是。这更有意义吗?
  • 所以,无论您在 5 中收到多少事件,您都只想要前两个,第一个按照它出现的确切顺序,第二个在缓冲间隔的边缘?
  • 听起来.Window(TimeSpan) 可能是您需要的。
  • 是的,Tamas,想象一下你有一辆公共汽车和一个数据库。有人将一条记录写入数据库,然后在总线上发送一条消息,服务会接收该消息。然后它去数据库获取记录。现在想象第二条记录被写入并发送第二条消息。如果我们使用 Throttle,RX 将忽略第二条消息,因此我们不会再次检查数据库,除非之后有新请求。如果我们使用 Buffer,在开始处理之前会有 5 秒的延迟。

标签: c# system.reactive


【解决方案1】:

我认为您将不得不编写自己的运算符,或者玩弄Window。与其他 cmets 一样,我不能 100% 确定您的要求,但我已尝试在这些测试中捕获它们。

using System;
using System.Reactive.Linq;
using Microsoft.Reactive.Testing;
using NUnit.Framework;

[TestFixture]
public class Throttle : ReactiveTest
{
    private TestScheduler _testScheduler;
    private ITestableObservable<int> _sourceSequence;
    private ITestableObserver<int> _observer;

    [SetUp]
    public void SetUp()
    {
        var windowPeriod = TimeSpan.FromSeconds(5);
        _testScheduler = new TestScheduler();
        _sourceSequence = _testScheduler.CreateColdObservable(
            //Question does the window start when the event starts, or at time 0?
            OnNext(0.1.Seconds(), 1),
            OnNext(1.0.Seconds(), 2),
            OnNext(2.0.Seconds(), 3),
            OnNext(7.0.Seconds(), 4),
            OnCompleted<int>(100.0.Seconds())
            );

        _observer = _testScheduler.CreateObserver<int>();
        _sourceSequence
            .Window(windowPeriod, _testScheduler)
            .SelectMany(window =>
                window.Publish(
                    shared => shared.Take(1).Concat(shared.Skip(1).TakeLast(1))
                )
            )
            .Subscribe(_observer);
        _testScheduler.Start();
    }

    [Test]
    public void Should_eagerly_publish_new_events()
    {
        Assert.AreEqual(OnNext(0.1.Seconds(), 1), _observer.Messages[0]);
    }

    [Test]
    public void Should_publish_last_event_of_a_window()
    {
        //OnNext(1.0.Seconds(), 2) is ignored. As OnNext(5.0.Seconds(), 3) occurs after it, and before the end of a window, it is yeiled.
        Assert.AreEqual(OnNext(5.0.Seconds(), 3), _observer.Messages[1]);
    }

    [Test]
    public void Should_only_publish_event_once_if_it_is_the_only_event_for_the_window()
    {
        Assert.AreEqual(OnNext(7.0.Seconds(), 4), _observer.Messages[2]);
        Assert.AreEqual(OnCompleted<int>(100.0.Seconds()), _observer.Messages[3]);
    }

    [Test]
    public void AsOneTest()
    {
        var expected = new[]
        {
            OnNext(0.1.Seconds(), 1),
            //OnNext(1.0.Seconds(), 2) is ignored. As OnNext(5.0.Seconds(), 3) occurs after it, and before the end of a window, it is yeiled.
            OnNext(5.0.Seconds(), 3),
            OnNext(7.0.Seconds(), 4),
            OnCompleted<int>(100.0.Seconds())
        };
        CollectionAssert.AreEqual(expected, _observer.Messages);
    }
}

【讨论】:

  • 我认为你是绝对正确的,这需要一些工作。额外有用的是,在看到此代码之前,我不知道 Microsoft ReactiveTest 类。我以前必须做一些“事情”来测试,这看起来更干净。谢谢。
  • NP。如果你想出了一个解决方案,如果你也把它和你的测试一起发布在这里会很棒。
  • 这个周末我要试一试。我想要一个 RX 解决方案,因为我试图在这里推动采用,这将是一个很好的展示案例。
  • 听起来不错,我强烈建议您首先提出一套测试,以便您知道您已经考虑了所有名义场景和边缘情况(窗口中没有项目,窗口中只有单个项目, 一个窗口中的两个项目, 一个窗口中的许多项目, 连续的空窗口, onError, onComplete 等...)
  • 我认为这是个好建议。我对刚刚从 nuget 安装的测试支持库一无所知,所以我可以先试一试并弄清楚这一切是如何工作的。
猜你喜欢
  • 2021-08-23
  • 2021-05-19
  • 2018-02-16
  • 1970-01-01
  • 2021-07-28
  • 2019-03-16
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多