【问题标题】:Concurrent Thread Manager并发线程管理器
【发布时间】:2013-01-27 23:48:17
【问题描述】:

我有以下并发线程管理器的实现

newtype Query = Query String

type ThreadWorker = (Query, ThreadStatus)
data ThreadStatus = Running | Finished | Threw IOException

newtype ThreadManager = Manager (MVar (M.Map ThreadId (MVar ThreadWorker)

我想写manageWorkers :: ThreadManager -> IO () 遍历Map 并查看ThreadWorkerThreadStatus

如果ThreadWorker 已完成,则它会从ThreadManager 中删除。如果抛出了异常,那么应该处理它(打印到标准输出就可以了)并且应该派生一个新线程来处理查询(假设存在一个函数runQuery :: Query -> IO a)并添加到@ 987654329@,否则线程仍在运行,应该不理会。​​p>

我的第一次尝试是:

manageWorkers :: ThreadManager -> IO ()
manageWorkers (Manager mgr) =
    modifyMVar mgr $ \m -> do
        m' <- M.traverseWithKey manageWorker m
        return (m', ())
where manageWorker :: ThreadId -> MVar ThreadWorker -> IO (MVar ThreadWorker)
      manageWorker tid wkr = tryTakeMVar wkr >>= \mwkr ->
          case mwkr of
               Just (_, Finished) -> undefined -- need to delete this finished ThreadWorker
               Just (q, Threw e ) -> do
                   putStrLn ("[ERROR] " ++ show e)
                   tid' <- forkIO $ runQuery q
                   undefined -- need to add new ThreadWorker
               Just r -> newMVar r
               _ -> newEmptyMVar

但后来我卡住了,在manageWorker 中似乎无法从ThreadManager 中删除或添加。我不确定是否可以从traverse-like 函数中执行我想要的操作。

是否可以使用我的ThreadManager 来实现这个manageWorkers 函数,或者那里有更好的抽象?

编辑:在 ThomasM.DuBuisson 建议使用折叠时,我现在有以下内容

manageWorkers (Manager mgr) =
    modifyMVar mgr $ \m ->
    return (M.foldrWithKey manageWorker M.empty m, ())
where manageWorker :: ThreadId -> MVar ThreadWorker -> M.Map ThreadId (MVar ThreadWorker)
                      -> IO (M.Map ThreadId (MVar ThreadWorker))
      manageWorker tid wkr ts = tryTakeMVar wkr >>= \mwkr ->
          case mwkr of
              Just (q, Threw e) -> do
                  putStrLn ("[ERROR] " ++ show e)
                  wkr' <- newEmptyMVar
                  tid' <- forkIO $ runQuery q
                  return $ M.insert tid' wkr' ts
              Just (_, Running) -> return $ M.insert tid wkr
              _ -> return ts

唯一的问题是manageWorker 的签名显然不适用于M.foldrWithKey。我需要M.foldrWithKeyM :: Monad m =&gt; (k -&gt; a -&gt; b -&gt; m b) -&gt; b -&gt; M.Map k a -&gt; m b,但这样的东西不存在,而且我自己编写时遇到了麻烦。

显然,我可以使用unsafePerformIO 来逃避 IO monad 并满足编译器的要求,但我只会将其用作最后的手段。在这种情况下使用unsafePerformIO 有意义吗?

【问题讨论】:

  • 如果你是takeMVar mgr,那么你应该能够毫无问题地遍历工人和putMVar mgr newMgr
  • @ThomasM.DuBuisson:modifyMVar 不只是对takeMVar/putMvar 模式的抽象吗?
  • 是的,但是如果您无法看到如何将修改后的状态传递给每个 manageWorker(扫描/折叠样式)以在 modifyMVar 中使用,那么也许显式使用 MVar 来保存状态会让您受益开始了。请注意,这是一条评论,并非完整的答案。

标签: haskell concurrency


【解决方案1】:

在我看来,您的问题与并发无关。
您只想遍历一个映射,同时从映射中删除一些键或在映射中插入一些键并执行一些IO 操作。
问题是traverseWithKey 无法从映射中删除键或在映射中插入键,foldrWithKey 无法执行IO 操作。
你需要一个M.foldrWithKeyM :: Monad m =&gt; (k -&gt; a -&gt; b -&gt; m b) -&gt; b -&gt; M.Map k a -&gt; m b
确实,这样的事情是不存在的。但是,如果您查看foldrWithKey 的文档,其中指出:

foldrWithKey f z == foldr (uncurry f) z . toAscList. 

如果我们将foldr 复制为foldM,我们可以猜测这样的M.foldrWithKeyM 可以组成。

以下是我的解决方案。

manageWorkers :: ThreadManager -> IO ()
manageWorkers (Manager mgr) = 
    modifyMVar mgr $ \m -> do
        m' <- foldM manageWorker m (M.toList m)
        return (m', ())
  where manageWorker :: M.Map ThreadId (MVar ThreadWorker) -> (ThreadId, MVar ThreadWorker) -> IO (M.Map ThreadId (MVar ThreadWorker))
        manageWorker ts (tid, wkr) = tryTakeMVar wkr >>= \mwkr ->
            case mwkr of
                 Just (_, Finished) -> return $ M.delete tid ts -- need to delete this finished ThreadWorker
                 Just (q, Threw e ) -> do
                       putStrLn ("[ERROR] " ++ show e)
                       wkr' <- newEmptyMVar
                       tid' <- forkIO $ runQuery q
                       return $ M.insert tid' wkr' ts -- need to add new ThreadWorker
                 _ -> return ts

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2012-08-01
    • 1970-01-01
    • 2014-02-12
    • 1970-01-01
    • 2023-03-22
    • 2021-05-10
    • 2011-07-12
    • 2010-12-29
    相关资源
    最近更新 更多