【问题标题】:Asynchronous map of non-deterministic function over array数组上非确定性函数的异步映射
【发布时间】:2017-02-18 00:37:26
【问题描述】:

我正在寻找一种方法将函数 f :: a -> IO(b) 并行映射到二维数组上,同时保持合理的内存消耗。
我还希望能够提供数组索引作为函数的参数,即映射g :: Int -> Int -> a -> IO(b),例如来自 Data.Vector 的imap,或来自 Data.Map 的mapWithKey
当前的尝试(见下文)要么有严重的内存消耗,要么在运行时抛出错误。

请注意,实际上我感兴趣的函数类型是h :: Int -> Int -> a -> Random b,其中Random 表示来自Control.Monad.Random 的一些Random monad;我使用evalRandIO 将其移至 IO monad。


尝试的解决方案:

假设我想将函数 foo :: Int -> Int -> a -> IO(b) 映射到类型为 a 的元素的二维数组上。 (这里 ab 是特定类型;没有隐含的通用量化。)
到目前为止,我已经尝试了以下方法:

  1. 带有 Control.Concurrent.Async 的简单列表

    import Control.Concurrent.Async(mapConcurrently)
    
    indexedArray :: [[(Int,Int,a)]]
    indexedArray = -- ...
    mappedArray = mapConcurrently (traverse (\(x,y,a) -> foo x y a)) indexedArray
    

这种方法的问题在于内存消耗超出图表(例如 4GB 供参考)。
如答案中所述,使用这种方法,我只评估并行行而不是所有元素,但这在实践中对我没有太大影响。

  1. 复盘

    import qualified Data.Array.Repa as R
    import Data.Array.Repa(Z(..), (:.)(..), U, DIM2)
    
    array :: R.Array U DIM2 a
    array = -- ...
    mappedArray = R.traverse array id (\i (Z :. x :. y) -> unsafePerformIO $ foo x y (i (Z :. x :. y)))
    result = R.computeP mappedArray
    

注意R.traverse 不是Data.Traversable(traverse)。由于 Repa 数组不支持 Data.Traversable(traverse),我无法以任何方式对 IO 操作进行排序,因此我必须使用 unsafePerformIO 才能使用内置的 "traverse" 功能。
这种方式具有良好的性能和出色的内存消耗(大约 50MB 供参考)。
但是有一个问题,因为我一直收到以下运行时错误:thread blocked indefinitely in an MVar operation

3a。 Data.Vector 和 Control.Parallel

基本上与 Repa 相同的方法会导致相同的错误,thread blocked indefinitely in an MVar operation
我再次求助于使用unsafePerformIO,因为 Data.Vector 向量没有可遍历的实例。

    import qualified Data.Vector as V
    import Control.Parallel.Strategies(using)
    import Data.Vector.Strategies(parVector)

    array :: V.Vector (V.Vector a)
    array = -- ...
    mappedArray = V.imap (\ y row -> V.imap (\x a -> unsafePerformIO $ foo x y a ) row ) `using` (parVector 1)

与 Repa 相比,内存消耗和性能稍差(大约 100MB 供参考),但仍保持可比性。

3b。 Data.Vector 和 Control.Concurrent.Async

按照 sheyll 的建议,但使用平面向量而不是嵌套向量。

    import qualified Data.Vector.Unboxed as V
    import qualified Data.Vector.Unboxed.Mutable as M
    import Control.Concurrent.Async(forConcurrently_)

    mappedFlattenedArray = do
      flattenedMArray <- V.unsafeThaw $ -- ...
      forConcurrently_ [0..w*h] (\i -> do v <- M.unsafeRead flattenedMArray i
                                          let (y,x) = quotRem i w
                                          v' <- foo x y v
                                          M.unsafeWrite flattenedMArray i v' )
      V.unsafeFreeze flattenedMArray

不幸的是,这种方法的内存消耗非常高(~3GB)。我认为这是因为forConcurrently_ 创建了很多thunk?我不知道如何避免这个问题。

  1. Data.Array 和 Control.Concurrent.Async

按照 Alec 的建议,使用 Data.Array 数组的可遍历实例:

    import qualified Data.Array.Unboxed as A
    import Control.Concurrent.Async(mapConcurrently)

    indexedArray :: A.Array (Int,Int) ((Int,Int),a)
    indexedArray = -- ...
    mappedArray = mapConcurrently (\((x,y),a) -> foo x y a) indexedArray

再一次,内存消耗非常高(~3GB),即使使用未装箱的数组也是如此;问题可能与方法 1 和 3b 中的问题相同,因为 thunk 的累积会消耗大量内存。我不知道如何解决它。


使用 Repa 的整体性能和内存消耗似乎比任何其他方法都要好,而且我也很欣赏处理二维数组和能够映射使用索引的函数的内置功能。不幸的是,大多数时候我都会遇到上述运行时错误(但并非总是如此!)。

我之前说过foo 的返回类型是IO(b) 的唯一原因是由于不确定性。所以我想我可以将输出类型更改为一些Random monad,而不是做unsafePerformIO,我可以简单地使用给定的种子执行runRandom。不幸的是,这并没有解决问题,因为我一直收到错误thread blocked indefinitely in an MVar operation

有什么办法可以挽救方法 2 (Repa) 来规避这个错误吗?或者还有其他适用的方法吗? 我知道,一般来说,IO 必然会破坏并行性,因为不能保证 IO 操作不会发生冲突,但至少对于这个用例,我认为应该有一个解决方案。 (见:Why is there no mapM for repa arrays?

另请参阅以下问题:Parallel mapM on Repa arrays。但请注意,我事先不知道我的函数 foo 需要多少个随机数。

【问题讨论】:

  • 您阅读过unsafePerformIO 的文档吗?你能告诉我们foo吗?
  • 这个问题似乎是基于对IO在类型系统中的作用的误解。如果您有一个您“知道”没有副作用的函数A -&gt; IO B,那么您应该能够将其重写为具有IO (A -&gt; B) 类型。如果你不能这样做,那么你的函数确实有副作用。如果随机性是您唯一的副作用,那么 this 可能会有所帮助(特别是,文档说“这可以用于,例如,允许随机计算并行运行”)

标签: arrays multithreading haskell asynchronous parallel-processing


【解决方案1】:

您的第一种方法可能是您想要的,但不是链表。请注意,mapConcurrently :: Traversable t =&gt; (a -&gt; IO b) -&gt; t a -&gt; IO (t b) 类型允许您在 anythingTraversable 上执行相当于并行 traverse 的操作,包括 Array(我在这里建议 Array 超过 Vector 只是因为它更适合多个维度)。

import Control.Concurrent.Async (mapConcurrently)
import Data.Array

indexedArray :: Array (Int,Int) (Int,Int,a)
indexedArray = ...

mappedArray = mapConcurrently (\(x,y,a) -> foo x y a) indexedArray

另外,请注意,您之前使用嵌套列表的方法仅并行化了每个子列表的 traverse - 它并没有并行化整个事情。

【讨论】:

  • 谢谢,我没有意识到 Data.Array 数组是可遍历的,不幸的是 Data.Vector 向量和 Repa 数组都不是。
  • @will Data.vector 也是可遍历的——只是未装箱的版本
  • 哦,好吧,我没有意识到。您有什么建议可以缓解(我认为是)thunk 堆积的问题吗?正如我在(更新的)问题中解释的那样,所有使用 Async 的版本似乎都使用了至少 20 倍的内存量。
  • 使用来自async 包的Traversable 盒装Array/VectormapConcurrently 实例是非常低效的方法。首先,数组值被装箱(这种方法对于未装箱的数组是不可能的),因此对于每个数组单元,您必须遵循一个额外的指针。另一个问题是mapConcurrently 会为每个数组元素创建类似 3 个线程的东西,这也会产生一些显着的开销,尤其是当您考虑包含数百万个元素的数组时(1000x1000 并不是那么大的数组)
【解决方案2】:

晚了几年,但有一个库可以立即满足您的需求:massiv。有一个函数imapIO,它有一个类型签名(将m限制为IO时):

imapIO :: (Source r' ix a, Mutable r ix b)
  => (ix -> a -> IO b) -- ^ Index aware mapping action
  -> Array r' ix a -- ^ Source array
  -> IO (Array r ix b)

根据源数组的构造方式,此imapIO 可以自动并行化或按顺序运行。在下面的示例中,由于使用了ParrandomR 将被并行化:

λ> arr = makeArrayR D Par (Sz (5 :. 11)) $ \ (i :. j) -> (i, j)
λ> mapIO randomR arr :: IO (Array P Ix2 Int)
Array P Par (Sz (5 :. 11))
  [ [ 0, 1, 2, 0, 4, 4, 1, 7, 2, 8, 9 ]
  , [ 0, 1, 1, 1, 1, 4, 6, 2, 1, 8, 4 ]
  , [ 2, 1, 2, 3, 4, 5, 5, 3, 4, 9, 4 ]
  , [ 2, 2, 2, 3, 4, 5, 6, 7, 3, 8, 8 ]
  , [ 2, 4, 2, 3, 4, 4, 4, 4, 8, 8, 9 ]
  ]

话虽如此,这是一种非常糟糕且非常缓慢的生成随机数数组的方法。有几个原因:

  • 使用randomR(同样适用于evalRandIO)本质上使用一个存储在IORef 中的全局随机数生成器。这种方法是线程安全的,但是因为它处于临界区并且多个线程会同时尝试使用它,所以并行化不会那么有效。
  • 它在下面使用random 包,这非常慢,我的意思是比splitmix 或任何其他随机库慢约x250 倍。

有两种更好的生成随机数的方法:

  • 如果生成器是纯的且可拆分的,我们可以确定性地将初始生成器拆分为我们想要用于生成随机数组的线程数,而不是独立使用这些生成器来生成数组的一部分。有一个特殊的函数massiv 就是这样做的randomArray。 下面是一个使用 random 包的示例:
λ> import Data.Massiv.Array
λ> import System.Random as System
λ> gen <- System.newStdGen
λ> compute $ randomArray gen System.split System.random Par (Sz2 2 3) :: Array P Ix2 Double
Array P Par (Sz (2 :. 3))
  [ [ 0.8867416334370383, 0.6217394261977418, 0.4536893479057291 ]
  , [ 0.6566602646092554, 0.6988432454700997, 0.14116451452794965 ]
  ]
  • 另一方面,有状态的、不可拆分的生成器,例如mwc-random。对于那些有单独的函数,称为randomArrayWCimapWS,它们还可以非常有效地生成具有随机值的数组,同时每个线程使用单独的随机数生成器。请参阅我对 SO 问题的回答:Parallel mapM on Repa arrays,该问题也与此问题相关。

【讨论】:

    【解决方案3】:

    为了获得最大性能和紧凑的内存布局,而不需要任何不必要的数组复制,我建议使用Data.Vector.Storable.Mutable

    可以thaw/unsafeThaw任何不可变向量(例如Data.Vector.Storable)取回一个可变向量,它支持Data.Vector.Storable.Mutable中定义的操作,比如 readwrite,它们是带有 PrimMonad 约束的 monadic 动作PrimMonad 是一个基本的 monad,例如 IOST

    例如write的签名是:

    (PrimMonad m, Storable a) => MVector (PrimState m) a -> Int -> a -> m () 
    

    看看documentation for convertion to/from a mutable vector

    这看起来令人生畏,但实际上很简单:MVector (PrimState m) a 是你从 thaw 得到的,m 可能是 STIOPrimState ms 如果 @987654344 @ 是 ST sReadWorld 如果 mIOInt 参数只是元素索引,a 是新值。 此函数返回一个动作,其副作用是就地/破坏性地更新给定位置的向量。

    当一个向量变异完成后,你可以freeze/unsafeFreeze它,得到一个不可变向量, freezeunsafeFreezethawunsafeThaw 的反义词, 例如unsafeFreeze 具有类型签名:

    unsafeFreeze :: (Storable a, PrimMonad m) => MVector (PrimState m) a -> m (Vector a) 
    

    如您所见,该函数还返回一个带有PrimMonad 约束的单子操作,有关详细信息,请参阅the documentation of the primitive package

    现在,为了实现您的目标,据我了解,您将unsafeThaw outter 向量,然后concurrently(来自asyncunsafeThawread,应用foowrite 每个元素,最后是 unsafeFreeze 每个 inner 向量,然后是 unsafeFreeze outter 可变向量。

    请注意,这也可以通过未装箱的可变 IO 数组以类似的方式完成。

    另请注意,我从您的问题中假设,并行性应限于外部向量,即所有行应并行完成,而不是所有行中的所有元素。

    【讨论】:

    • 谢谢,我试试看。为了方便起见,我只是逐行进行并行处理,但我不介意任何一种方式。
    • 我在需要使用原生 C 库的应用程序中使用了类似的方法; Storable的特殊之处在于Storables的任意向量都可以直接作为C-Array,见Foreign.Storable
    • 这种方法是明智的,但不是最优的。首先,这描述了参差不齐的数组,其中它是可存储向量的盒装向量(即Stoable a =&gt; V.Vector (S.Vector a)),这意味着内存布局不是最佳的。其次,使用async 也不会提供最佳并行化,因为工作窃取调度程序具有只有少数线程会表现得更好。有关更多信息,请参阅我的答案。
    猜你喜欢
    • 1970-01-01
    • 2020-01-21
    • 2016-08-28
    • 1970-01-01
    • 2021-03-06
    • 1970-01-01
    • 2022-11-16
    • 2014-07-21
    • 1970-01-01
    相关资源
    最近更新 更多