答案
最简单的解决方案是使用Client 和Server,正如 danidiaz 在 cmets 中建议的那样,因为pipes 没有任何内置支持循环管道,如果不是不可能的话,这将是非常困难的正确地这样做。这主要是因为我们需要处理awaits 的数量与yields 的数量不匹配的情况。
编辑:我添加了一个关于另一个答案的问题的部分。请参阅“另一个有问题的替代方案”部分
编辑 2: 我在下面添加了一个问题较少的可能解决方案。请参阅“可能的解决方案”部分
一个有问题的替代方案
然而,可以借助Proxy 框架(使用Client 和Server)和简洁的函数generalize 来模拟它,它将单向Pipe 变成双向Proxy .
generalize f x0
+-----------+ +---------------------+
| | | |
| | x <======================== x
a ==> f ==> b becomes | |
| | a ==> f ==> b
| | | | |
+-----|-----+ +----------|----------+
v v
r r
现在我们可以使用//> 和>\\ 来堵住两端,让流循环:
loop :: Monad m => Pipe a a m r -> a -> Effect m r
loop p x0 = pure >\\ generalize p x0 //> pure
这个形状
loop f
a
+-----|-----+
| | |
/====<=======/===<========\
| | | |
\=> a ==> f ==> a ==/
| |
+-----|-----+
v
r
如您所见,我们需要为a 输入一个初始值。这是因为无法保证管道在产生之前不会await,这将迫使它永远等待。
但是请注意,如果管道在awaiting 之前多次使用管道yields,这将丢弃数据,因为 generalize 是在内部使用状态单子实现的,该单子在产生和等待时检索最后一个值。
使用(有问题的想法)
要将它与您的管道一起使用,只需将它们组合起来并将它们交给loop:
runEffect $ loop (f >-> g)
但请不要使用,一不小心会随机丢掉数据
另一个有问题的替代方案
你也可以像 mingmingrr 建议的那样制作一个懒惰的无限管道链
infiniteChain :: Functor m => Pipe a a m r -> Producer a m r
infiniteChain f = infiniteChain >-> f
这解决了丢弃/重复值的问题,但还有其他几个问题。首先是在屈服之前先等待会导致无限循环使用无限内存,但这已经在 mingmingrr 的答案中得到解决。
另一个更难解决的问题是,在相应的 yield 之前的每个操作都会为每个 await 复制一次。如果我们修改他们的示例以记录正在发生的事情,我们可以看到这一点:
import Pipes
import qualified Pipes.Prelude as P
f :: Monad m => Pipe Int Int m r
f = P.map (* 2)
g :: Monad m => Int -> Pipe Int Int m ()
g 0 = return ()
g n = do
lift . putStrLn $ "Awaiting. n = " ++ show n
x <- await
lift . putStrLn $ "Got: x = " ++ show x ++ " and n = "++ show n ;
yield (x + 1)
g (n - 1)
cyclic' :: Monad m => Int -> Producer Int m Int
cyclic' input = let pipe = (yield input >> pipe) >-> f >-> g 6 in pipe
现在,运行 runEffect (cyclic' 0 >-> P.print) 将打印以下内容:
Awaiting. n = 6
Got: x = 0 and n = 6
1
Awaiting. n = 5
Awaiting. n = 6
Got: x = 0 and n = 6
Got: x = 2 and n = 5
3
Awaiting. n = 4
Awaiting. n = 5
Awaiting. n = 6
Got: x = 0 and n = 6
Got: x = 2 and n = 5
Got: x = 6 and n = 4
7
Awaiting. n = 3
Awaiting. n = 4
Awaiting. n = 5
Awaiting. n = 6
Got: x = 0 and n = 6
Got: x = 2 and n = 5
Got: x = 6 and n = 4
Got: x = 14 and n = 3
15
Awaiting. n = 2
Awaiting. n = 3
Awaiting. n = 4
Awaiting. n = 5
Awaiting. n = 6
Got: x = 0 and n = 6
Got: x = 2 and n = 5
Got: x = 6 and n = 4
Got: x = 14 and n = 3
Got: x = 30 and n = 2
31
Awaiting. n = 1
Awaiting. n = 2
Awaiting. n = 3
Awaiting. n = 4
Awaiting. n = 5
Awaiting. n = 6
Got: x = 0 and n = 6
Got: x = 2 and n = 5
Got: x = 6 and n = 4
Got: x = 14 and n = 3
Got: x = 30 and n = 2
Got: x = 62 and n = 1
63
如您所见,对于每个await,我们重新执行所有操作,直到对应的yield。更具体地说,a await 触发管道的新副本运行,直到它达到产量。当我们再次 await 时,副本将运行到下一次 yield,如果它在此期间触发 await,它将创建另一个副本并运行它直到第一次 yield,依此类推。
这意味着在最好的情况下,我们得到O(n^2) 而不是线性性能(并且使用O(n) 而不是O(1) 内存),因为我们为每个动作重复一切。在最坏的情况下,例如如果我们正在读取或写入文件,我们可能会得到完全错误的结果,因为我们正在重复副作用。
一个可能的解决方案
如果您确实必须使用Pipes 并且不能使用request/respond,并且您确定您的代码永远不会await 超过(或之前)它yields(或在这些情况下有一个很好的默认值),我们可以在我之前的尝试的基础上制定一个解决方案,至少可以处理yielding 比你await 更多的情况。
诀窍是在generalize 的实现中添加一个缓冲区,这样多余的值就会被存储而不是被丢弃。我们还可以将额外参数保留为缓冲区为空时的默认值。
import Pipes.Lift (evalStateP)
import Control.Monad.Trans.State.Strict (state, modify)
import qualified Data.Sequence
generalize' :: Monad m => Pipe a b m r -> x -> Proxy x a x b m r
generalize' p x0 = evalStateP Seq.empty $ up >\\ hoist lift p //> dn
where
up () = do
x <- lift $ state (takeHeadDef x0)
request x
dn a = do
x <- respond a
lift $ modify (Seq.|> x)
takeHeadDef :: a -> Seq.Seq a -> (a, Seq.Seq a)
takeHeadDef x0 xs = (foldr const x0 xs, Seq.drop 1 xs)
如果我们现在将其插入到loop 的定义中,我们将解决丢弃多余值的问题(以保留缓冲区的内存成本为代价)。它还可以防止重复默认值以外的任何值,并且仅在缓冲区为空时使用默认值。
loop' :: Monad m => a -> Pipe a a m r -> Effect m r
loop' x0 p = pure >\\ generalize' p x0 //> pure
如果我们想让awaiting 在yielding 之前出现错误,我们可以简单地将error 作为我们的默认值:loop' (error "Await without yield") somePipe。
TL;DR
使用来自Pipes.Core 的Client 和Server。它将解决您的问题,并且不会导致大量奇怪的错误。
如果这不可能,我的“可能的解决方案”部分和generalize 的修改版本在大多数情况下应该可以完成这项工作。