【问题标题】:bound/throttle concurrent jobs, without creating a thread-per-job绑定/限制并发作业,而不为每个作业创建线程
【发布时间】:2016-12-21 02:28:32
【问题描述】:

我想同时处理一组 io 绑定作业,但绑定/限制未完成(正在运行)的并发作业的数量。

分块是增加并发性的一种简单方法,但如果项目花费不同的时间,则会产生瓶颈。

我发现这样做有一些问题1)。有没有办法避免以下问题,同时保持相当的惯用和简洁?

1) 使用 BlockingCollection(如下所示)。但是,这导致了一个解决方案,其中这里的并发是由boundedSize 数量的“消费者”线程生成的。我正在寻找一种不需要boundedSize 线程数来实现boundedSize 并发作业的解决方案。 (如果boundedSize 很大怎么办?)。我没有看到如何获取一个项目,处理它,然后信号完成。我只能拿东西......因为我不想一次翻阅整个列表,所以消费者需要同步运行它。

type JobNum = int

let RunConcurrentlyBounded (boundedSize:int) (start : JobNum) (finish : JobNum) (mkJob: JobNum -> Async<Unit>)  =

    // create a BlockingCollection
    use bc = new BlockingCollection<Async<Unit>>(boundedSize)

    // put async jobs on BlockingCollection
    Async.Start(async {
        { start .. finish }
        |> Seq.map mkJob
        |> Seq.iter bc.Add
        bc.CompleteAdding()
    })

    // each consumer runs it's job synchronously
    let mkConsumer (consumerId:int) = async { for job in bc.GetConsumingEnumerable() do do! job }

    // create `boundedSize` number of consumers in parallel
    { 1 .. boundedSize }
    |> Seq.map mkConsumer
    |> Async.Parallel
    |> Async.RunSynchronously
    |> ignore

let Test () = 
    let boundedSize = 15 
    let start = 1
    let finish = 50
    let mkJob = (fun jobNum -> async { 
        printfn "%A STARTED" jobNum 
        do! Async.Sleep(5000)
        printfn "%A COMPLETED" jobNum 
    })
    RunConcurrentlyBounded boundedSize start finish mkJob

我知道 TPL 和邮箱处理器,但认为可能有一些简单而强大的东西,但避免了大量线程创建路由。

理想情况下只有一个生产者线程和一个消费者线程;我怀疑 BlockingCollection 可能不是这种情况下的正确并发原语?

【问题讨论】:

  • 为什么不是 TPL?使用起来非常简单。

标签: asynchronous concurrency f#


【解决方案1】:

使用SemaphoreSlim,这似乎与我将要获得的一样好。

我想底层的 ThreadPool 确实控制了这里的并发性。

let RunConcurrentlySemaphore (boundedSize:int) (start : JobNum) (finish : JobNum) (mkJob: JobNum -> Async<Unit>)  =

    use ss = new SemaphoreSlim(boundedSize);

    { start .. finish } 
      |> Seq.map (mkJob >> fun job -> async { 
          do! Async.AwaitTask(ss.WaitAsync())
          try do! job finally ss.Release() |> ignore
      })
      |> Async.Parallel
      |> Async.RunSynchronously

【讨论】:

    猜你喜欢
    • 2021-10-15
    • 1970-01-01
    • 1970-01-01
    • 2015-12-04
    • 1970-01-01
    • 2021-09-04
    • 2010-12-05
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多