【问题标题】:Can Haskell's Control.Concurrent.Async.mapConcurrently have a limit?Haskell 的 Control.Concurrent.Async.mapConcurrently 可以有限制吗?
【发布时间】:2013-09-24 14:55:33
【问题描述】:

我试图在 Haskell 中并行运行多个下载,我通常只使用 Control.Concurrent.Async.mapConcurrently 函数。但是,这样做会打开大约 3000 个连接,这会导致 Web 服务器拒绝所有连接。是否可以完成与 mapConcurrently 相同的任务,但一次只能打开有限数量的连接(即一次只能打开 2 个或 4 个)?

【问题讨论】:

标签: haskell asynchronous concurrency io


【解决方案1】:

一个快速的解决方案是使用semaphore 来限制并发操作的数量。这不是最佳的(所有线程都是一次创建然后等待),但可以:

import Control.Concurrent.MSem
import Control.Concurrent.Async
import Control.Concurrent (threadDelay)
import qualified Data.Traversable as T

mapPool :: T.Traversable t => Int -> (a -> IO b) -> t a -> IO (t b)
mapPool max f xs = do
    sem <- new max
    mapConcurrently (with sem . f) xs

-- A little test:
main = mapPool 10 (\x -> threadDelay 1000000 >> print x) [1..100]

【讨论】:

  • 非常感谢,这完全解决了我的问题。一次创建的每个线程都不是问题,程序的总内存消耗保持在 250 MB 以下。
【解决方案2】:

你也可以试试pooled-io 包,你可以写:

import qualified Control.Concurrent.PooledIO.Final as Pool
import Control.DeepSeq (NFData)
import Data.Traversable (Traversable, traverse)

mapPool ::
   (Traversable t, NFData b) =>
   Int -> (a -> IO b) -> t a -> IO (t b)
mapPool n f = Pool.runLimited n . traverse (Pool.fork . f)

【讨论】:

    【解决方案3】:

    如果您在列表中有操作,则此操作具有较少的依赖关系

    import Control.Concurrent.Async (mapConcurrently)
    import Data.List.Split (chunksOf)
    
    mapConcurrentChunks :: Int -> (a -> IO b) -> [a] -> IO [b]
    mapConcurrentChunks n ioa xs = concat <$> mapM (mapConcurrently ioa) (chunksOf n xs)
    

    编辑:只是缩短了一点

    【讨论】:

    • 这不是最优的,因为假设 n=3,如果一个任务特别长,其他 2 个线程将闲置等待第三个任务完成。理想情况下,您希望在前一个线程完成后立即启动下一个线程。
    • 确实如此。我将其作为内存问题的快速解决方法,并且线程速度相似。绝对不是一个通用的解决方案。感谢您指出这一点。
    【解决方案4】:

    如果其中一些线程的持续时间明显长于其他线程,则对线程进行分块可能效率低下。这是一个更流畅但更复杂的解决方案:

    {-# LANGUAGE TupleSections #-}
    import Control.Concurrent.Async (async, waitAny)
    import Data.List                (delete, sortBy)
    import Data.Ord                 (comparing)
    
    concurrentlyLimited :: Int -> [IO a] -> IO [a]
    concurrentlyLimited n tasks = concurrentlyLimited' n (zip [0..] tasks) [] []
    
    concurrentlyLimited' _ [] [] results = return . map snd $ sortBy (comparing fst) results
    concurrentlyLimited' 0 todo ongoing results = do
        (task, newResult) <- waitAny ongoing
        concurrentlyLimited' 1 todo (delete task ongoing) (newResult:results)
    concurrentlyLimited' n [] ongoing results = concurrentlyLimited' 0 [] ongoing results
    concurrentlyLimited' n ((i, task):otherTasks) ongoing results = do
        t <- async $ (i,) <$> task
        concurrentlyLimited' (n-1) otherTasks (t:ongoing) results
    

    注意:感谢lifted-async,使用MonadBaseControl IO 的实例代替IO,可以使上述代码更通用。

    【讨论】:

      【解决方案5】:

      使用Control.Concurrent.Spawn 库很容易做到这一点:

      import Control.Concurrent.Spawn
      
      type URL      = String
      type Response = String    
      
      numMaxConcurrentThreads = 4
      
      getURLs :: [URL] -> IO [Response]
      getURLs urlList = do
         wrap <- pool numMaxConcurrentThreads
         parMapIO (wrap . fetchURL) urlList
      
      fetchURL :: URL -> IO Response
      

      【讨论】:

        猜你喜欢
        • 2017-04-08
        • 2021-05-10
        • 2017-07-10
        • 2013-06-20
        • 1970-01-01
        • 2012-03-29
        • 2021-04-11
        • 2013-05-03
        • 1970-01-01
        相关资源
        最近更新 更多