【问题标题】:How to limit buffering when concurrently processing streams with Reactive Extensions使用 Reactive Extensions 同时处理流时如何限制缓冲
【发布时间】:2012-06-24 06:49:43
【问题描述】:

我有一个交错流,我 split into separate sequential streams

制片人

int streamCount = 3;

new MyIEnumerable<ElementType>()
.ToObservable(Scheduler.ThreadPool)
.Select((x,i) => new { Key = (i % streamCount), Value = x })
.Subscribe(x => outputs[x.Key].OnNext(x.Value));

其中outputs[] 是处理流的主题,定义如下。 .ObserveOn() 用于同时处理流(多线程)。

消费者

var outputs = Enumerable.Repeat(0, streamCount).Select(_ => new Subject<char>()).ToArray();                                                                                                     

outputs[0].ObserveOn(Scheduler.ThreadPool).Subscribe(x => Console.WriteLine("stream 0: {0}", x));
outputs[1].ObserveOn(Scheduler.ThreadPool).Subscribe(x => Console.WriteLine("stream 1: {0}", x));
outputs[2].ObserveOn(Scheduler.ThreadPool).Subscribe(x => Console.WriteLine("stream 2: {0}", x));

这段代码的问题是它会尽可能快地读取整个枚举,即使输出流无法赶上。在我的情况下,可枚举是一个文件流,所以这可能会导致使用大量内存。因此,如果缓冲区达到某个阈值,我希望阻止读取。

【问题讨论】:

    标签: c# multithreading system.reactive


    【解决方案1】:

    我已经通过在生产者和消费者上使用信号量来解决这个问题,如下所示。但是,我不确定这是否是一个好的解决方案(就 Rx 合约、编程风格等而言)。

    var semaphore = new SemaphoreSlim(MAX_BUFFERED_ELEMENTS);
    
    // add to producer (before subscribe)
    .Do(_ => semaphore.Wait());
    
    // add to consumer (before subscribe)
    .Do(_ => semaphore.Release()))
    

    CancelationToken 传递给对Wait() 的调用并确保在流异常停止时取消它可能是个好主意?

    【讨论】:

      【解决方案2】:

      我认为您的解决方案非常合理。最大的问题(对上一个问题有一些背景)是您的解决方案的“内部”目前无处不在。只需确保在正确编码时清理以下内容:

      • 将所有内容包装到一个公开单个方法的类中:IDisposable Subscribe(&lt;index&gt;, Action)IObservable&lt;element&gt; ToObservable(&lt;index&gt;))。返回的订阅或返回的 observable 都已经完成了所有的“工作”,即添加的 Do 操作等等。它下面有一个字典或列表这一事实应该与用户完全无关,否则在这里对您的代码进行任何更改都需要在整个地方进行更改。

      • CancelationToken 是个好主意,请确保在 OnCompletedOnError 上取消它,您可以使用 Do 的重载来取消它。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2011-10-21
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2023-03-05
        • 1970-01-01
        • 2020-09-05
        相关资源
        最近更新 更多