【问题标题】:Strict evaluation techniques for concurrent channels in HaskellHaskell中并发通道的严格评估技术
【发布时间】:2011-03-06 16:41:50
【问题描述】:

我正在玩弄 Haskell 线程,并且遇到了跨通道传递惰性求值值的问题。例如,如果有 N 个工作线程和 1 个输出线程,工作人员会传达未评估的工作,而输出线程最终会为他们完成工作。

我已在各种文档中阅读过有关此问题的信息并查看了各种解决方案,但我只找到了一个有效的解决方案,而其余的则没有。下面是一些代码,其中工作线程开始一些可能需要很长时间的计算。我按降序启动线程,这样第一个线程应该花费的时间最长,后面的线程应该更早完成。

import Control.Concurrent (forkIO)
import Control.Concurrent.Chan   -- .Strict
import Control.Concurrent.MVar
import Control.Exception (finally, evaluate)
import Control.Monad (forM_)
import Control.Parallel.Strategies (using, rdeepseq)

main = (>>=) newChan $ (>>=) (newMVar []) . run

run :: Chan (Maybe String) -> MVar [MVar ()] -> IO ()
run logCh statVars = do
  logV <- spawn1 readWriteLoop
  say "START"
  forM_ [18,17..10] $ spawn . busyWork
  await
  writeChan logCh Nothing -- poison the logger
  takeMVar logV
  putStrLn "DONE"
  where
    say mesg = force mesg >>= writeChan logCh . Just

    force s = mapM evaluate s  -- works
--    force s = return $ s `using` rdeepseq  -- no difference
--    force s = return s -- no-op; try this with strict channel

    busyWork = say . show . sum . filter odd . enumFromTo 2 . embiggen
    embiggen i = i*i*i*i*i

    readWriteLoop = readChan logCh >>= writeReadLoop
    writeReadLoop Nothing = return ()
    writeReadLoop (Just mesg) = putStrLn mesg >> readWriteLoop

    spawn1 action = do
      v <- newEmptyMVar
      forkIO $ action `finally` putMVar v ()
      return v

    spawn action = do
      v <- spawn1 action
      modifyMVar statVars $ \vs -> return (v:vs, ())

    await = do
      vs <- modifyMVar statVars $ \vs -> return ([], vs)
      mapM_ takeMVar vs

使用大多数技术,结果按产生的顺序报告;也就是说,运行时间最长的计算在前。我将此解释为输出线程正在完成所有工作:

-- results in order spawned (longest-running first = broken)
START
892616806655
503999185040
274877906943
144162977343
72313663743
34464808608
15479341055
6484436675
2499999999
DONE

我认为这个问题的答案是严格的渠道,,但他们没有奏效。我知道字符串的 WHNF 是不够的,因为这只会强制最外层的构造函数(字符串的第一个字符为 nil 或 cons)。 rdeepseq 应该完全评估,但它没有区别。我发现唯一可行的方法是将Control.Exception.evaluate :: a -&gt; IO a 映射到字符串中的所有字符。 (请参阅代码中的 force 函数 cmets 以了解几种不同的替代方案。)这是 Control.Exception.evaluate 的结果:

-- results in order finished (shortest-running first = correct)
START
2499999999
6484436675
15479341055
34464808608
72313663743
144162977343
274877906943
503999185040
892616806655
DONE

那么,为什么严格渠道或rdeepseq 不产生这个结果呢?还有其他技术吗?我是否误解了为什么第一个结果被破坏了?

【问题讨论】:

    标签: haskell concurrency strictness


    【解决方案1】:

    这里有两个问题。

    第一次尝试(使用显式 rnf)不起作用的原因是,通过使用 return,您创建了一个在评估时会完全评估自身的 thunk,但是 thunk本身没有被评估。请注意,evaluate 的类型是a -&gt; IO a:它在IO 中返回一个值的事实意味着evaluate 可以强制排序:

    return (error "foo")   >> return 1 == return 1
    evaluate (error "foo") >> return 1 == error "foo"
    

    结果是这段代码:

    force s = evaluate $ s `using` rdeepseq
    

    会起作用(与mapM_ evaluate s 具有相同的行为)。


    使用严格通道的情况有点棘手,但我相信这是由于严格并发中的一个错误。昂贵的计算实际上是在工作线程上运行的,但它对您没有多大好处(您可以通过在字符串中隐藏一些异步异常并查看异常出现在哪个线程上来显式检查这一点)。

    什么是错误?我们来看一下strictwriteChan的代码:

    writeChan :: NFData a => Chan a -> a -> IO ()
    writeChan (Chan _read write) val = do
      new_hole <- newEmptyMVar
      modifyMVar_ write $ \old_hole -> do
        putMVar old_hole $! ChItem val new_hole
        return new_hole
    

    在评估 thunk 之前,我们看到 modifyMVar_write 上被调用。那么操作顺序是:

    1. writeChan 已输入
    2. 我们takeMVar write(阻止任何想写信给频道的人)
    3. 我们评估昂贵的 thunk
    4. 我们将昂贵的 thunk 放到频道中
    5. 我们putMVar write,解除所有其他线程的阻塞

    evaluate 变体不会出现这种行为,因为它们会在获取锁之前执行评估。

    我会就此向 Don 发送邮件,看看他是否同意这种行为不是最理想的。

    Don 同意这种行为是次优的。我们正在开发补丁。

    【讨论】:

    • 第二部分非常有趣(仍在思考第一部分)。如此严格的通道在发送线程上进行评估,但首先在通道中进行一种 reservation ?工人可能会按照我想要的顺序完成,但通道会将它们序列化为 FCFS(而不是 FFFS = 先完成,先服务)。谢谢。
    • 附带说明,您并不总是需要 deepseq,它会强制评估列表中的每个元素。通常,它足以强制仅评估列表的脊椎,这可以通过 length xs 'seq' xs 完成