【问题标题】:Using GroupBy with Concat only returns first group values将 GroupBy 与 Concat 一起使用仅返回第一组值
【发布时间】:2017-03-04 00:02:07
【问题描述】:

我有一些使用 GroupBy 的测试代码,可以按预期工作......

代码

var sw = Stopwatch.StartNew();
int groupSize = 5;

var coreObservable = Observable
    .Range(1, 20)
    .Select((x, idx) => new { x, idx })
    .GroupBy(x => x.idx / groupSize, x => x.x)
    .Select(x => x.ToList())
    .Replay()
    .RefCount();

coreObservable.Subscribe(
    x => x.Subscribe(y => Console.WriteLine("Event raised [Books: {0}, Timestamp: {1}]", string.Join("|", y), sw.Elapsed)),
    () => Console.WriteLine("Subcription closed"));

coreObservable.Wait(); // blocking until observable completes

输出

Event raised [Values: 1|2|3|4|5, Timestamp: 00:00:00.3224002]
Event raised [Values: 6|7|8|9|10, Timestamp: 00:00:00.3268353]
Event raised [Values: 11|12|13|14|15, Timestamp: 00:00:00.3270101]
Event raised [Values: 16|17|18|19|20, Timestamp: 00:00:00.3270803]
Subcription closed

问题是当我尝试将 Concat 与此表达式一起使用时...

代码

var sw = Stopwatch.StartNew();
int groupSize = 5;

var coreObservable = Observable
    .Range(1, 20)
    .Select((x, idx) => new { x, idx })
    .GroupBy(x => x.idx / groupSize, x => x.x)
    .Select(x => x.ToList())
    .Concat() // JUST ADDED THIS
    .Replay()
    .RefCount();

coreObservable.Subscribe(
    x => Console.WriteLine("Event raised [Values: {0}, Timestamp: {1}]", string.Join("|", x), sw.Elapsed),
    () => Console.WriteLine("Subcription closed"));

coreObservable.Wait(); // blocking until observable completes

输出

Event raised [Values: 1|2|3|4|5, Timestamp: 00:00:00.2728469]
Event raised [Values: , Timestamp: 00:00:00.2791311]
Event raised [Values: , Timestamp: 00:00:00.2793720]
Event raised [Values: , Timestamp: 00:00:00.2794617]
Subcription closed

请注意,仅公开了第一组值。

我使用 GroupBy 而不是 Buffer 的原因是因为我试图用它来为突发的数据馈送创建最大大小的块。原始的 observable 可能是项目数组,当单个事件中有太多项目时,我想在其中拆分数组。

我想使用 Concat 的原因是因为我希望能够在数组事件之间创建延迟,就像很多人推荐的 here 一样。

【问题讨论】:

    标签: c# system.reactive


    【解决方案1】:

    Concat()替换为Merge(),就可以正常使用了。

    我认为您的问题的原因是 Concat() 在当前序列完成之前不会开始收听下一个序列。

    连接图:

    s1 --0--1--2-|
    s2           -5--6--7--8--|
    r  --0--1--2--5--6--7--8--|
    

    Merge() 同时订阅所有子序列,并在任何子序列发布值时发布值。

    合并图:

    s1 --1--1--1--|
    s2 ---2---2---2|
    r  --12-1-21--2|
    

    因此,在您的情况下,Concat()Select(x => x.ToList()) 订阅第一个IObservable<IList<int>>,发布值直到完成,然后订阅下一个序列。 GroupBy() 将为它找到的每个组创建一个新的 IGroupedObservable 流,但是所有 IGroupedObservables 将同时完成:当基础流完成时。

    所以Concat()监听第一个流直到完成,但是当第一个流完成时,其他的也都完成了(因为它们其实都是同一个序列,只是key分割),所以没有值为它发布以下序列。

    所有图表都是从here 借来的,这是 Rx 的绝佳资源,我强烈建议您在此处查找有关各种运算符如何工作的任何问题。

    【讨论】:

    • 问题的一部分是因为我使用了热可观察对象吗?所以也许所有这些事件在 concat 订阅之前就已经发生了。
    • 是的,这正是问题所在。所有的 observables 都由 GroupBy() 一次性“启动”,但 Concat 一次只监听一个。
    【解决方案2】:

    你的问题可以简化成这样,这样想起来可能更简单:

    var sw = Stopwatch.StartNew();
    var subject = new Subject<int>();
    
    var o2 = subject.Where(i => i % 2 == 0).ToList();
    var o3 = subject.Where(i => i % 3 == 0).ToList();
    var o4 = subject.Where(i => i % 4 == 0).ToList();
    
    var c = Observable.Concat(o2, o3, o4)
    //      .Replay()
    //      .RefCount() 
    //.Replay().RefCount() has no impact here.
        ;
    
    c.Subscribe(
        x => Console.WriteLine("Event raised [Values: {0}, Timestamp: {1}]", string.Join("|", x), sw.Elapsed),
        () => Console.WriteLine("Subcription closed"));
    
    for(int i = 0; i < 6; i++)
        subject.OnNext(i);
    subject.OnCompleted();
    

    输出:

    Event raised [Values: 0|2|4, Timestamp: 00:00:00.0002278]
    Event raised [Values: , Timestamp: 00:00:00.0002850]
    Event raised [Values: , Timestamp: 00:00:00.0003049]
    Subcription closed
    

    如果你要对这些进行大理石图,它会是这样的:

    s   : 012345|
    o2  : ------(024)|
    o3  : ------(03) |
    o4  : ------(04) |
    
    cOut: ------(024)|
    cSub: (So2)------(So3)(So4) 
    
    cSub shows when c subscribes to child observables. cOut shows c's output. 
    So2 means subscribe to o2, So3 means subscribe to o3, etc..
    

    Concat 订阅传递给它的第一个 observable,然后仅在当前 observable 完成时订阅后续的 observable。在我们的例子中,ToList 在源完成之前不会省略任何内容,此时它会转储整个列表。所以o2o3o4 全部同时完成,但c 只订阅了o2o2 完成后,它会尝试订阅其他人,但他们已经完成了。

    至于如何修复它,Merge 会起作用,但我猜你想在组 2 之前处理组 1,Merge 会中断。

    【讨论】:

      猜你喜欢
      • 2018-08-17
      • 2019-12-21
      • 2019-01-09
      • 1970-01-01
      • 2022-01-16
      • 1970-01-01
      • 2020-12-03
      • 2017-12-14
      • 1970-01-01
      相关资源
      最近更新 更多