【问题标题】:What's an idiomatic way of handling a lazy input channel in Haskell在 Haskell 中处理惰性输入通道的惯用方法是什么
【发布时间】:2015-03-19 06:59:39
【问题描述】:

我正在实现一个 IRC 机器人,因为我使用 OpenSSL.Session 通过 SSL 进行连接,所以我使用 lazyRead 函数从套接字读取数据。在连接的初始阶段,我需要按顺序执行几件事:昵称协商、昵称服务识别、加入频道等)因此涉及一些状态。现在我想出了以下内容:

data ConnectionState = Initial | NickIdentification | Connected

listen :: SSL.SSL -> IO ()
listen ssl = do
  lines <- BL.lines `fmap` SSL.lazyRead ssl
  evalStateT (mapM_ (processLine ssl) lines) Initial

processLine :: SSL.SSL -> BL.ByteString -> StateT ConnectionState IO ()
processLine ssl line = do case message of
                            Just a -> processMessage ssl a
                            Nothing -> return ()
  where message = IRC.decode $ BL.toStrict line

processMessage :: SSL.SSL -> IRC.Message -> StateT ConnectionState IO ()
processMessage ssl m = do
    state <- S.get
    case state of
      Initial -> when (IRC.msg_command m == "376") $ do
        liftIO $ putStrLn "connected!"
        liftIO $ privmsg ssl "NickServ" ("identify " ++ nick_password)
        S.put NickIdentification
      NickIdentification -> do
        when (identified m) $ do
          liftIO $ putStrLn "identified!"
          liftIO $ joinChannel ssl chan
          S.put Connected
      Connected -> return ()
    liftIO $ print m
    when (IRC.msg_command m == "PING") $ (liftIO . pong . mconcat . map show) (IRC.msg_params m)

因此,当我进入“已连接”状态时,我仍然会执行 case 语句,即使它只是真正需要初始化连接。另一个问题是添加嵌套的 StateT 会非常痛苦。

另一种方法是将mapM 替换为自定义的东西,以便仅处理行,直到我们连接,然后在其余部分开始另一个循环。这将需要跟踪列表中剩余的内容或再次调用SSL.lazyRead(这还不错)。

另一种解决方案是将剩余线条列表保持在状态并在需要时绘制线条,类似于getLine

在这种情况下,最好的做法是什么? Haskell 的懒惰会让我们在状态停止更新后直接转到Connected 案例还是case 总是严格的?

【问题讨论】:

  • 另一种选择是使用conduit

标签: haskell state monads irc


【解决方案1】:

您可以使用pipes 中的Pipe 类型。诀窍在于,您可以在Pipe 的控制流中隐式编码状态,而不是创建状态机和转换函数。

这是Pipe 的样子:

stateful :: Pipe ByteString ByteString IO r
stateful = do
    msg <- await
    if (IRC.msg_command msg == "376")
        then do
            liftIO $ putStrLn "connected!"
            liftIO $ privmsg ssl "NickServ" ("identify " ++ nick_password)
            yield msg
            nick
        else stateful

nick :: Pipe ByteString ByteString IO r
nick = do
    msg <- await
    if identified msg
        then do
            liftIO $ putStrLn "identified!"
            liftIO $ joinChannel ssl chan
            yield msg
            cat  -- Forward the remaining input to output indefinitely 
        else nick

stateful 管道对应于 processMessage 函数的有状态部分。它处理初始化和身份验证,但通过重新yielding msg 将进一步的消息处理推迟到下游阶段。

然后,您可以使用for 循环遍历Pipe yields 的每条消息:

processMessage :: Consumer ByteString IO r
processMessage = for stateful $ \msg -> do
    liftIO $ print m
    when (IRC.msg_command m == "PING") $ (liftIO . pong . mconcat . map show) (IRC.msg_params m)

现在您只需要一个ByteString 行的来源,以提供给processMessage。您可以使用以下Producer

lines :: Producer ByteString IO ()
lines = do
    bs <- liftIO (ByteString.getLine)
    if ByteString.null bs
        then return ()
        else do
            yield bs
            lines

然后您可以将lines 连接到processMessage 并运行它们:

runEffect (lines >-> processMessage) :: IO ()

注意lines Producer 不使用惰性IO。即使你使用严格的ByteString 模块它也可以工作,但整个程序的行为仍然是惰性的。

如果您想详细了解pipes 的工作原理,可以阅读the pipes tutorial

【讨论】:

  • 非常感谢,现在试用 Pipes,它们看起来棒极了!是否等同于在processMessage 中使用await 然后执行lines &gt;-&gt; stateful &gt;-&gt; processMessage?另外,处理生产者中发生的异常的最佳方法是什么,以便其他管道有机会优雅地清理?例如,我可能想在stateful 中生成一些线程,但我想在消费者点击 EOF 时杀死它们。
  • 我想我应该看看 Pipes.Safe。
  • 另外,我想先处理 ping 消息,所以我切换有状态和 processMessage 位置,但是每当它完全完成注册时,我必须在有状态中执行forever await 之类的操作。从某种意义上说,这很糟糕吗?
  • 我发现使用序列可以使这个状态机更加直观:runEffect (messages con &gt;-&gt; handlePing con &gt;-&gt; sequence_ [nickNegotiation con, nickServId con, forever await]),我现在可以更改事物的顺序。 (这些消费者现在在完成处理时返回)
  • 是的,pipes-safe 处理与资源清理相关的任何事情。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2014-01-28
  • 2019-02-26
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多