【问题标题】:Reactive Extensions .MaxBy反应式扩展 .MaxBy
【发布时间】:2017-09-12 05:55:04
【问题描述】:

为什么

var a = Observable.Interval(TimeSpan.FromSeconds(1))
    .Publish();

a.Subscribe(o =>
{
    Console.WriteLine("Test");
});
a.Connect();

火,但不是

var a = Observable.Interval(TimeSpan.FromSeconds(1))
    .MaxBy(o=>o)
    .Publish();

a.Subscribe(o =>
{
    Console.WriteLine("Test");
});
a.Connect();

我正在尝试在不同的场景中使用 MaxBy,但什至无法使上述方法正常工作。

这是我的更复杂的例子

var _telemetryBatchObservable = Observable.FromEventPattern<DeviceStateStreamEventArg>(
        ev => DeviceStateStreamEvent += ev,
        ev => DeviceStateStreamEvent -= ev)
    .Synchronize()
    .GroupBy(o => o.EventArgs.DeviceId)
    .Select(o => o.MaxBy(i => i.EventArgs.DateTimeOffset))
    .SelectMany(o => o.Select(i => i))
    .SelectMany(o => o.Select(i => i))
    .Buffer(TimeSpan.FromMilliseconds(5000), 100)
    .Publish();

【问题讨论】:

    标签: c# system.reactive


    【解决方案1】:

    MaxMaxBy 在源 observable 终止时只发出一个值。如果您有一个非终止源,它们将永远不会发出。

    试试这个代码作为反例:

    var a = Observable.Interval(TimeSpan.FromSeconds(1))
        .Take(3) //causes termination after 3 values
        .MaxBy(o => o)
        .Publish();
    

    【讨论】:

    • 如何知道哪些对非终止源有效,哪些无效?
    • 文档,实验。几乎所有聚合运算符都只在终止时发出:唯一的例外是 Scan。 Rx 在 Linq 之后进行了模式化,它也只在可枚举完成时发出单个值。
    • 缓冲区和样本在实时流中运行良好...我找不到任何说明 MaxBy 不应该与非终止源一起使用的文档。
    • 有两个最好的文档站点,都准确地描述了它:introtorx.com/Content/v1.0.10621.0/…reactivex.io/documentation/operators/max.html
    • 如果您认为缺少文档,请随时填补空白。
    【解决方案2】:

    Shlomo 指出 .MaxBy 无法在非终止序列上产生值。

    如果您愿意,您可以使用.Scan 创建迄今为止最大值的序列。

    你可以这样做:

    var a =
        Observable
            .Interval(TimeSpan.FromSeconds(1))
            .Select(x => x % 4L)
            .Scan(long.MinValue, (x, y) => x > y ? x : y)
            .Publish();
    
    a.Subscribe(o =>
    {
        Console.WriteLine(o);
    });
    a.Connect();
    

    我输入了.Select(x =&gt; x % 4L) 以使序列更有趣。

    这给了:

    0 1 2 3 3 3 3 3

    如果您只想在有新最大值时生成一个值,则在.Scan 之后添加一个.DistinctUntilChanged()。很好很简单。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2011-02-10
      • 2011-08-27
      • 2012-01-15
      • 1970-01-01
      • 1970-01-01
      • 2013-01-25
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多