【问题标题】:Change interval of RX operators?更改 RX 运算符的间隔?
【发布时间】:2015-11-02 10:57:40
【问题描述】:

这可能是一个愚蠢的问题,因为我对 RX 有点陌生 :)

我正在采样一个事件(.Net 4.0 的 RX):

eventAsObservable.Sample(TimeSpan.FromSeconds(1)).Timestamp().Subscribe(x =>Console.WriteLine("testing:" + x.Value.EventArgs.str));

问题是采样时间需要能够即时更改,我想我可以创建一些属性来删除现有处理程序并在它更改时创建一个新处理程序,但它似乎有点混乱且更容易受到攻击到时间问题。有没有办法简单地改变间隔?

示例:假设某人正在编写一串字符,当检测到某个序列时,您希望更改采样时间而不会丢失事件,并且最好不要多次获取事件

【问题讨论】:

  • 它是自动完成,但采样间隔因数据源而异(例如,本地查找比 Web 服务更快)

标签: .net system.reactive


【解决方案1】:

我不知道改变现有采样间隔的方法,但您可以做的是以您需要的最高频率进行采样,然后使用 Where 子句进行过滤它使用您可以更改的变量。

例如:

static IObservable<T> SampleEvery<T>(this IObservable<T> source,
    Func<int> multipleProvider)
{
    int counter = 0;
    Func<T, bool> predicate = ignored => {
        counter++;
        if (counter >= multipleProvider())
        {
            counter = 0;
        }
        return counter == 0;
    };
    return source.Where(predicate);
}

然后你可以这样称呼它:

// Keep this somewhere you can change it
int multiple = 1;

eventAsObservable.Sample(TimeSpan.FromSeconds(1))
                 .SampleEvery(() => multiple)
                 .Timestamp()
                 .Subscribe(x => Console.WriteLine("testing:" + 
                                                   x.Value.EventArgs.str));

现在,更改multiple 的值将更改有效采样频率。

这是一个非常丑陋的 hack,但我认为它应该可以工作。

【讨论】:

  • 您是否在“if (counter >= multipleProvider)”上缺少 ()
  • 似乎是一个可行的解决方案,我会给它一些测试,谢谢!我想知道对于这样的场景有什么好的解决方案......也许能够发送一个返回时间跨度而不是实际时间跨度的方法/lambda。您想即时将参数更改为不同的运算符似乎并不牵强
  • 正是我需要的,没有找到其他方法来测试我的解决方案
【解决方案2】:

我知道这个问题已经得到解答,但我想我会添加另外几种以 Rx 方式解决它的方法。

您可以在TimeSpan 的序列上使用Switch

private Subject<TimeSpan> sampleFrequencies = new Subject<TimeSpan>();

sampleFrequencies
    .Select(x => eventAsObservable.Sample(Observable.Interval(x)).Timestamp())
    .Switch()
    .Subscribe(x => .WriteLine("testing:" + x.Value.EventArgs.str));

// To change:
// sampleFrequencies.OnNext(TimeSpan.FromSeconds(5));

或者,也可以使用DeferTakeUntilRepeat 来解决(这个有点疯狂,包含在一个思考练习中):

private TimeSpan sampleFrequency = TiemSpan.FromSeconds(2);
private Subject<Unit> frequencyChanged = new Subject<Unit>();

(Observable
    .Defer(() => eventAsObservable
       .Sample(Observable.Interval(sampleFrequency)
    )
    .Timestamp()
    .TakeUntil(frequencyChanged)
).Repeat()
.Subscribe(x => .WriteLine("testing:" + x.Value.EventArgs.str));

// To change: 
// sampleFrequency = TimeSpan.FromSeconds(5);
// frequencyChanged.OnNext(new Unit());

【讨论】:

  • 实际上,我最终只是做了一个属性,它创建了一个新订阅,然后处置了旧订阅。我认为它两次触发事件的可能性很小,但我认为使用其他方法引入错误/开销的风险使其成为最有吸引力的解决方案。例如,我可能想将 Sample 更改为 Throttle。其他解决方案似乎有点过头了,但我感谢您的帮助!
  • 我的两个解决方案仍然使用 Sample,因此可以轻松地将其更改为 Throttle(时间戳内容直接取自您的要求)。 Switch 版本几乎可以完成您现在正在做的事情(取消、重新启动),但在 Switch 内部。
【解决方案3】:

TL;DR: 使用 ObservableFromIntervalFunctor 创建一个 Observable,如下所示:

void Main()
{
    // Pick an initial period, it can be changed later.
    var intervalPeriod = TimeSpan.FromSeconds(1);

    // Create an observable using a functor that captures the interval period.
    var o = ObservableFromIntervalFunctor(() => intervalPeriod);

    // Log every value so we can visualize the observable.
    o.Subscribe(Console.WriteLine);

    // Sleep for a while so you can observe the observable.
    Thread.Sleep(TimeSpan.FromSeconds(5.0));

    // Changing the interval period will takes effect on next tick.
    intervalPeriod = TimeSpan.FromSeconds(0.3);

}

IObservable<long> ObservableFromIntervalFunctor(Func<TimeSpan> intervalPeriodFunctor)
{
    return Observable.Generate(0L, s => true, s => s + 1, s => s, s => intervalPeriodFunctor());
}

解释: Observable.Generate 有一个重载,允许您指定通过仿函数生成下一个值的时间。通过传递一个捕获时间跨度变量的函子,您可以通过更改捕获的时间跨度变量来改变 observable.interval 周期。

Linqpad sn-p here

【讨论】:

  • 注意:如果间隔由慢变快,这种方法必须等到正在进行的慢间隔完成后,间隔更改才会生效。这与 Jon Skeet 的回答形成对比,后者将在最短采样时间后生效。
【解决方案4】:

你为什么不订阅两次?

Observable.Merge(
    eventAsObservable.Sample(TimeSpan.FromSeconds(1)).Timestamp().SelectMany(x => doLocalLookup(x)),
    eventAsObservable.Sample(TimeSpan.FromSeconds(10)).Timestamp().SelectMany(x => doRemoteLookup(x)),
).Subscribe(Console.WriteLine);

或者,如果搜索仅基于某种前缀或限定符(如 Google Chrome 的“?”)处于活动状态运营商:

Observable.Merge(
    eventAsObservable.Sample(TimeSpan.FromSeconds(1)).Where(x => isLocal(x)).SelectMany(x => doLocalLookup(x)),
    eventAsObservable.Sample(TimeSpan.FromSeconds(10)).Where(x => isARemoteQuery(x).SelectMany(x => doRemoteLookup(x)),
).Subscribe(Console.WriteLine);

【讨论】:

  • 间隔可以是任何值,可以根据每个源进行自定义。到目前为止,最好的解决方案是创建一个新订阅,然后处理旧订阅,但事件被触发两次的可能性很小。如果有办法访问实际属性,那就太好了
  • 我的意思是,您可以让它们全部运行,但通过 Where 子句切换它们的输出。也许我不太明白你在做什么......
猜你喜欢
  • 1970-01-01
  • 2018-03-14
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2010-11-19
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多