【发布时间】:2017-03-04 00:02:07
【问题描述】:
我有一些使用 GroupBy 的测试代码,可以按预期工作......
代码
var sw = Stopwatch.StartNew();
int groupSize = 5;
var coreObservable = Observable
.Range(1, 20)
.Select((x, idx) => new { x, idx })
.GroupBy(x => x.idx / groupSize, x => x.x)
.Select(x => x.ToList())
.Replay()
.RefCount();
coreObservable.Subscribe(
x => x.Subscribe(y => Console.WriteLine("Event raised [Books: {0}, Timestamp: {1}]", string.Join("|", y), sw.Elapsed)),
() => Console.WriteLine("Subcription closed"));
coreObservable.Wait(); // blocking until observable completes
输出
Event raised [Values: 1|2|3|4|5, Timestamp: 00:00:00.3224002]
Event raised [Values: 6|7|8|9|10, Timestamp: 00:00:00.3268353]
Event raised [Values: 11|12|13|14|15, Timestamp: 00:00:00.3270101]
Event raised [Values: 16|17|18|19|20, Timestamp: 00:00:00.3270803]
Subcription closed
问题是当我尝试将 Concat 与此表达式一起使用时...
代码
var sw = Stopwatch.StartNew();
int groupSize = 5;
var coreObservable = Observable
.Range(1, 20)
.Select((x, idx) => new { x, idx })
.GroupBy(x => x.idx / groupSize, x => x.x)
.Select(x => x.ToList())
.Concat() // JUST ADDED THIS
.Replay()
.RefCount();
coreObservable.Subscribe(
x => Console.WriteLine("Event raised [Values: {0}, Timestamp: {1}]", string.Join("|", x), sw.Elapsed),
() => Console.WriteLine("Subcription closed"));
coreObservable.Wait(); // blocking until observable completes
输出
Event raised [Values: 1|2|3|4|5, Timestamp: 00:00:00.2728469]
Event raised [Values: , Timestamp: 00:00:00.2791311]
Event raised [Values: , Timestamp: 00:00:00.2793720]
Event raised [Values: , Timestamp: 00:00:00.2794617]
Subcription closed
请注意,仅公开了第一组值。
我使用 GroupBy 而不是 Buffer 的原因是因为我试图用它来为突发的数据馈送创建最大大小的块。原始的 observable 可能是项目数组,当单个事件中有太多项目时,我想在其中拆分数组。
我想使用 Concat 的原因是因为我希望能够在数组事件之间创建延迟,就像很多人推荐的 here 一样。
【问题讨论】:
标签: c# system.reactive