【发布时间】:2017-02-18 00:37:26
【问题描述】:
我正在寻找一种方法将函数 f :: a -> IO(b) 并行映射到二维数组上,同时保持合理的内存消耗。
我还希望能够提供数组索引作为函数的参数,即映射g :: Int -> Int -> a -> IO(b),例如来自 Data.Vector 的imap,或来自 Data.Map 的mapWithKey。
当前的尝试(见下文)要么有严重的内存消耗,要么在运行时抛出错误。
请注意,实际上我感兴趣的函数类型是h :: Int -> Int -> a -> Random b,其中Random 表示来自Control.Monad.Random 的一些Random monad;我使用evalRandIO 将其移至 IO monad。
尝试的解决方案:
假设我想将函数 foo :: Int -> Int -> a -> IO(b) 映射到类型为 a 的元素的二维数组上。 (这里 a 和 b 是特定类型;没有隐含的通用量化。)
到目前为止,我已经尝试了以下方法:
-
带有 Control.Concurrent.Async 的简单列表
import Control.Concurrent.Async(mapConcurrently) indexedArray :: [[(Int,Int,a)]] indexedArray = -- ... mappedArray = mapConcurrently (traverse (\(x,y,a) -> foo x y a)) indexedArray
这种方法的问题在于内存消耗超出图表(例如 4GB 供参考)。
如答案中所述,使用这种方法,我只评估并行行而不是所有元素,但这在实践中对我没有太大影响。
-
复盘
import qualified Data.Array.Repa as R import Data.Array.Repa(Z(..), (:.)(..), U, DIM2) array :: R.Array U DIM2 a array = -- ... mappedArray = R.traverse array id (\i (Z :. x :. y) -> unsafePerformIO $ foo x y (i (Z :. x :. y))) result = R.computeP mappedArray
注意R.traverse 不是Data.Traversable(traverse)。由于 Repa 数组不支持 Data.Traversable(traverse),我无法以任何方式对 IO 操作进行排序,因此我必须使用 unsafePerformIO 才能使用内置的 "traverse" 功能。
这种方式具有良好的性能和出色的内存消耗(大约 50MB 供参考)。
但是有一个问题,因为我一直收到以下运行时错误:thread blocked indefinitely in an MVar operation。
3a。 Data.Vector 和 Control.Parallel
基本上与 Repa 相同的方法会导致相同的错误,thread blocked indefinitely in an MVar operation。
我再次求助于使用unsafePerformIO,因为 Data.Vector 向量没有可遍历的实例。
import qualified Data.Vector as V
import Control.Parallel.Strategies(using)
import Data.Vector.Strategies(parVector)
array :: V.Vector (V.Vector a)
array = -- ...
mappedArray = V.imap (\ y row -> V.imap (\x a -> unsafePerformIO $ foo x y a ) row ) `using` (parVector 1)
与 Repa 相比,内存消耗和性能稍差(大约 100MB 供参考),但仍保持可比性。
3b。 Data.Vector 和 Control.Concurrent.Async
按照 sheyll 的建议,但使用平面向量而不是嵌套向量。
import qualified Data.Vector.Unboxed as V
import qualified Data.Vector.Unboxed.Mutable as M
import Control.Concurrent.Async(forConcurrently_)
mappedFlattenedArray = do
flattenedMArray <- V.unsafeThaw $ -- ...
forConcurrently_ [0..w*h] (\i -> do v <- M.unsafeRead flattenedMArray i
let (y,x) = quotRem i w
v' <- foo x y v
M.unsafeWrite flattenedMArray i v' )
V.unsafeFreeze flattenedMArray
不幸的是,这种方法的内存消耗非常高(~3GB)。我认为这是因为forConcurrently_ 创建了很多thunk?我不知道如何避免这个问题。
- Data.Array 和 Control.Concurrent.Async
按照 Alec 的建议,使用 Data.Array 数组的可遍历实例:
import qualified Data.Array.Unboxed as A
import Control.Concurrent.Async(mapConcurrently)
indexedArray :: A.Array (Int,Int) ((Int,Int),a)
indexedArray = -- ...
mappedArray = mapConcurrently (\((x,y),a) -> foo x y a) indexedArray
再一次,内存消耗非常高(~3GB),即使使用未装箱的数组也是如此;问题可能与方法 1 和 3b 中的问题相同,因为 thunk 的累积会消耗大量内存。我不知道如何解决它。
使用 Repa 的整体性能和内存消耗似乎比任何其他方法都要好,而且我也很欣赏处理二维数组和能够映射使用索引的函数的内置功能。不幸的是,大多数时候我都会遇到上述运行时错误(但并非总是如此!)。
我之前说过foo 的返回类型是IO(b) 的唯一原因是由于不确定性。所以我想我可以将输出类型更改为一些Random monad,而不是做unsafePerformIO,我可以简单地使用给定的种子执行runRandom。不幸的是,这并没有解决问题,因为我一直收到错误thread blocked indefinitely in an MVar operation。
有什么办法可以挽救方法 2 (Repa) 来规避这个错误吗?或者还有其他适用的方法吗? 我知道,一般来说,IO 必然会破坏并行性,因为不能保证 IO 操作不会发生冲突,但至少对于这个用例,我认为应该有一个解决方案。 (见:Why is there no mapM for repa arrays?)
另请参阅以下问题:Parallel mapM on Repa arrays。但请注意,我事先不知道我的函数 foo 需要多少个随机数。
【问题讨论】:
-
您阅读过
unsafePerformIO的文档吗?你能告诉我们foo吗? -
这个问题似乎是基于对
IO在类型系统中的作用的误解。如果您有一个您“知道”没有副作用的函数A -> IO B,那么您应该能够将其重写为具有IO (A -> B)类型。如果你不能这样做,那么你的函数确实有副作用。如果随机性是您唯一的副作用,那么 this 可能会有所帮助(特别是,文档说“这可以用于,例如,允许随机计算并行运行”)
标签: arrays multithreading haskell asynchronous parallel-processing