【发布时间】:2016-12-21 02:28:32
【问题描述】:
我想同时处理一组 io 绑定作业,但绑定/限制未完成(正在运行)的并发作业的数量。
分块是增加并发性的一种简单方法,但如果项目花费不同的时间,则会产生瓶颈。
我发现这样做有一些问题1)。有没有办法避免以下问题,同时保持相当的惯用和简洁?
1) 使用 BlockingCollection(如下所示)。但是,这导致了一个解决方案,其中这里的并发是由boundedSize 数量的“消费者”线程生成的。我正在寻找一种不需要boundedSize 线程数来实现boundedSize 并发作业的解决方案。 (如果boundedSize 很大怎么办?)。我没有看到如何获取一个项目,处理它,然后信号完成。我只能拿东西......因为我不想一次翻阅整个列表,所以消费者需要同步运行它。
type JobNum = int
let RunConcurrentlyBounded (boundedSize:int) (start : JobNum) (finish : JobNum) (mkJob: JobNum -> Async<Unit>) =
// create a BlockingCollection
use bc = new BlockingCollection<Async<Unit>>(boundedSize)
// put async jobs on BlockingCollection
Async.Start(async {
{ start .. finish }
|> Seq.map mkJob
|> Seq.iter bc.Add
bc.CompleteAdding()
})
// each consumer runs it's job synchronously
let mkConsumer (consumerId:int) = async { for job in bc.GetConsumingEnumerable() do do! job }
// create `boundedSize` number of consumers in parallel
{ 1 .. boundedSize }
|> Seq.map mkConsumer
|> Async.Parallel
|> Async.RunSynchronously
|> ignore
let Test () =
let boundedSize = 15
let start = 1
let finish = 50
let mkJob = (fun jobNum -> async {
printfn "%A STARTED" jobNum
do! Async.Sleep(5000)
printfn "%A COMPLETED" jobNum
})
RunConcurrentlyBounded boundedSize start finish mkJob
我知道 TPL 和邮箱处理器,但认为可能有一些简单而强大的东西,但避免了大量线程创建路由。
理想情况下只有一个生产者线程和一个消费者线程;我怀疑 BlockingCollection 可能不是这种情况下的正确并发原语?
【问题讨论】:
-
为什么不是 TPL?使用起来非常简单。
标签: asynchronous concurrency f#