【问题标题】:Composing Pipes into a loop or cycle in haskell在haskell中将管道组成一个循环或循环
【发布时间】:2020-04-04 09:40:03
【问题描述】:

这个问题是关于 Haskell 库 Pipes

此问题与2019年Advent of CodeDay 11有关(可能有剧透警告)

我有两个 Pipe Int Int m r brainrobot 需要在连续循环中相互传递信息。也就是brain的输出需要去robot的输入,robot的输出需要去brain的输入。当brain 完成后,我需要计算结果。

如何将brainrobot 组合成一个循环?理想情况下,我可以将 Effect m r 类型的循环传递给 runEffect

编辑:结果应如下所示:

   +-----------+     +-----------+   
   |           |     |           |   
   |           |     |           |   
a ==>    f    ==> b ==>    g    ==> a=|
^  |           |     |           |    |
|  |     |     |     |     |     |    |
|  +-----|-----+     +-----|-----+    |
|        v                 v          |
|        ()                r          |
+=====================================+

【问题讨论】:

  • “来回传递信息”。 Pipes 是单向的。如果您需要双向性,也许您可​​以使用ClientServer 类型以及Pipes.Corehackage.haskell.org/package/pipes-4.3.13/docs/Pipes-Core.html 中的requestrespond(+>>)(>>~) 函数。
  • 抱歉不清楚。信息只向一个方向流动,但它围绕一个闭环流动。我正在尝试弄清楚如何关闭循环。
  • 我认为您需要决定当awaits 的数量与yields 的数量不匹配时会发生什么。对此没有好的解决方案。 requestrespond 的替代方案通过强制它们相同来解决这个问题。

标签: haskell composition haskell-pipes


【解决方案1】:

答案

最简单的解决方案是使用ClientServer,正如 danidiaz 在 cmets 中建议的那样,因为pipes 没有任何内置支持循环管道,如果不是不可能的话,这将是非常困难的正确地这样做。这主要是因为我们需要处理awaits 的数量与yields 的数量不匹配的情况。

编辑:我添加了一个关于另一个答案的问题的部分。请参阅“另一个有问题的替代方案”部分

编辑 2: 我在下面添加了一个问题较少的可能解决方案。请参阅“可能的解决方案”部分

一个有问题的替代方案

然而,可以借助Proxy 框架(使用ClientServer)和简洁的函数generalize 来模拟它,它将单向Pipe 变成双向Proxy .

                                       generalize f x0
   +-----------+                   +---------------------+
   |           |                   |                     |
   |           |                x <======================== x
a ==>    f    ==> b   becomes      |                     |
   |           |                a ==>         f         ==> b
   |     |     |                   |                     |
   +-----|-----+                   +----------|----------+
         v                                    v     
         r                                    r     

现在我们可以使用//&gt;&gt;\\ 来堵住两端,让流循环:

loop :: Monad m => Pipe a a m r -> a -> Effect m r
loop p x0 = pure >\\ generalize p x0 //> pure

这个形状

            loop f

              a 
        +-----|-----+
        |     |     |
 /====<=======/===<========\
 |      |           |      |
 \=> a ==>    f    ==> a ==/
        |           |
        +-----|-----+
              v    
              r    

如您所见,我们需要为a 输入一个初始值。这是因为无法保证管道在产生之前不会await,这将迫使它永远等待。

但是请注意,如果管道在awaiting 之前多次使用管道yields,这将丢弃数据,因为 generalize 是在内部使用状态单子实现的,该单子在产生和等待时检索最后一个值。

使用(有问题的想法)

要将它与您的管道一起使用,只需将它们组合起来并将它们交给loop

runEffect $ loop (f >-> g)

但请不要使用,一不小心会随机丢掉数据

另一个有问题的替代方案

你也可以像 mingmingrr 建议的那样制作一个懒惰的无限管道链

infiniteChain :: Functor m => Pipe a a m r -> Producer a m r
infiniteChain f = infiniteChain >-> f

这解决了丢弃/重复值的问题,但还有其他几个问题。首先是在屈服之前先等待会导致无限循环使用无限内存,但这已经在 mingmingrr 的答案中得到解决。

另一个更难解决的问题是,在相应的 yield 之前的每个操作都会为每个 await 复制一次。如果我们修改他们的示例以记录正在发生的事情,我们可以看到这一点:

import Pipes
import qualified Pipes.Prelude as P

f :: Monad m => Pipe Int Int m r
f = P.map (* 2)

g :: Monad m => Int -> Pipe Int Int m ()
g 0 = return ()
g n = do
  lift . putStrLn $ "Awaiting. n = " ++ show n
  x <- await
  lift . putStrLn $ "Got: x = " ++ show x ++ " and n = "++ show n ;
  yield (x + 1)
  g (n - 1)

cyclic' :: Monad m => Int -> Producer Int m Int
cyclic' input = let pipe = (yield input >> pipe) >-> f >-> g 6 in pipe

现在,运行 runEffect (cyclic' 0 &gt;-&gt; P.print) 将打印以下内容:

Awaiting. n = 6
Got: x = 0 and n = 6
1
Awaiting. n = 5
Awaiting. n = 6
Got: x = 0 and n = 6
Got: x = 2 and n = 5
3
Awaiting. n = 4
Awaiting. n = 5
Awaiting. n = 6
Got: x = 0 and n = 6
Got: x = 2 and n = 5
Got: x = 6 and n = 4
7
Awaiting. n = 3
Awaiting. n = 4
Awaiting. n = 5
Awaiting. n = 6
Got: x = 0 and n = 6
Got: x = 2 and n = 5
Got: x = 6 and n = 4
Got: x = 14 and n = 3
15
Awaiting. n = 2
Awaiting. n = 3
Awaiting. n = 4
Awaiting. n = 5
Awaiting. n = 6
Got: x = 0 and n = 6
Got: x = 2 and n = 5
Got: x = 6 and n = 4
Got: x = 14 and n = 3
Got: x = 30 and n = 2
31
Awaiting. n = 1
Awaiting. n = 2
Awaiting. n = 3
Awaiting. n = 4
Awaiting. n = 5
Awaiting. n = 6
Got: x = 0 and n = 6
Got: x = 2 and n = 5
Got: x = 6 and n = 4
Got: x = 14 and n = 3
Got: x = 30 and n = 2
Got: x = 62 and n = 1
63

如您所见,对于每个await,我们重新执行所有操作,直到对应的yield。更具体地说,a await 触发管道的新副本运行,直到它达到产量。当我们再次 await 时,副本将运行到下一次 yield,如果它在此期间触发 await,它将创建另一个副本并运行它直到第一次 yield,依此类推。

这意味着在最好的情况下,我们得到O(n^2) 而不是线性性能(并且使用O(n) 而不是O(1) 内存),因为我们为每个动作重复一切。在最坏的情况下,例如如果我们正在读取或写入文件,我们可能会得到完全错误的结果,因为我们正在重复副作用。

一个可能的解决方案

如果您确实必须使用Pipes 并且不能使用request/respond,并且您确定您的代码永远不会await 超过(或之前)它yields(或在这些情况下有一个很好的默认值),我们可以在我之前的尝试的基础上制定一个解决方案,至少可以处理yielding 比你await 更多的情况。

诀窍是在generalize 的实现中添加一个缓冲区,这样多余的值就会被存储而不是被丢弃。我们还可以将额外参数保留为缓冲区为空时的默认值。

import Pipes.Lift (evalStateP)
import Control.Monad.Trans.State.Strict (state, modify)
import qualified Data.Sequence

generalize' :: Monad m => Pipe a b m r -> x -> Proxy x a x b m r
generalize' p x0 = evalStateP Seq.empty $ up >\\ hoist lift p //> dn
  where
    up () = do
        x <- lift $ state (takeHeadDef x0)
        request x
    dn a = do
        x <- respond a
        lift $ modify (Seq.|> x)
    takeHeadDef :: a -> Seq.Seq a -> (a, Seq.Seq a)
    takeHeadDef x0 xs = (foldr const x0 xs, Seq.drop 1 xs)

如果我们现在将其插入到loop 的定义中,我们将解决丢弃多余值的问题(以保留缓冲区的内存成本为代价)。它还可以防止重复默认值以外的任何值,并且仅在缓冲区为空时使用默认值。

loop' :: Monad m => a -> Pipe a a m r -> Effect m r
loop' x0 p = pure >\\ generalize' p x0 //> pure

如果我们想让awaiting 在yielding 之前出现错误,我们可以简单地将error 作为我们的默认值:loop' (error "Await without yield") somePipe

TL;DR

使用来自Pipes.CoreClientServer。它将解决您的问题,并且不会导致大量奇怪的错误。

如果这不可能,我的“可能的解决方案”部分和generalize 的修改版本在大多数情况下应该可以完成这项工作。

【讨论】:

【解决方案2】:

您可以通过将管道的输出绑定到输入来制作循环管道。

cyclic :: Functor m => Producer a m r
cyclic = cyclic >-> f >-> g

考虑以下示例:

import Pipes
import qualified Pipes.Prelude as P

f :: Monad m => Pipe Int Int m r
f = P.map (* 2)

g :: Monad m => Int -> Pipe Int Int m Int
g 0 = return 100
g n = do x <- await ; yield (x + 1) ; g (n - 1)

由于fg 在等待之前都不会产生任何输出,因此使用cyclic = cyclic &gt;-&gt; f &gt;-&gt; g 将导致f 永远等待。避免这种情况的关键是确保 fg 在等待之前产生一些东西,或者将初始输入输入到第一个管道,如下所示:

cyclic' :: Monad m => Int -> Producer Int m Int
cyclic' input = let pipe = (yield input >> pipe) >-> f >-> g 6 in pipe

在这里运行runEffect (cyclic' 0 &gt;-&gt; P.print) 会给出return 100 并打印1 3 7 15 31 63

附: (可能是 Advent of Code 2019 剧透)您可以使用相同的方案来完成第 7 天。如果您的 Intcode 计算机的类型为 StateT IntcodeState (Pipe Int Int m),那么您可以使用 replicate 5 (evalState runIntcode initialIntcodeState) 来获得对应于 5 个放大器中的每一个的 5 个管道。

【讨论】:

  • 在循环中使用无限的管道链是否有任何性能损失?如果fg 是有状态的会发生什么?另外,我认为cyclic'yield input &gt;&gt; cyclic' 中缺少一个参数。
  • 它更像是一个封闭的管道循环,而不是一个无限的管道链,所以性能应该类似于fibs = 0 : 1 : zipWith (+) fibs (tail fibs)fg 仍然可以是有状态的,因为除了 Monad m 之外,m 上没有任何约束。
  • 这是一个例子:说f = catg = do {lift $ putStrLn "hello"; yield "a"; await}。这将打印两次“hello”,尽管只产生一个值。这是故意的吗?
  • 啊,在第一次 yield 被执行两次之前似乎什么都没有,所以将 g 重新排序为 g = do { yield "a" ; lift (putStrLn "hello") ; await } 使它只打印一次“hello”。
  • 是的,完全正确。问题是你实际上并没有创建一个循环,只是一个潜在的无限管道链,只要链的一部分需要更多值,就可以调用它。第二次 yield 之前的所有内容也将执行两次。在这种情况下,第一次 yield 之前的所有内容都会执行 3 次。
猜你喜欢
  • 1970-01-01
  • 2021-09-09
  • 2023-03-10
  • 1970-01-01
  • 1970-01-01
  • 2021-12-13
  • 1970-01-01
  • 1970-01-01
  • 2013-05-14
相关资源
最近更新 更多