【问题标题】:Handling exceptions in an iteratee library without an error state在没有错误状态的情况下处理迭代库中的异常
【发布时间】:2012-11-16 19:00:17
【问题描述】:

我正在尝试编写一个枚举器,用于使用Scalaz 7 的 iteratee 库从java.io.BufferedReader 中逐行读取文件,该库目前只为java.io.Reader 提供一个(非常慢的)枚举器。

我遇到的问题与以下事实有关类型的构造函数,而 Scalaz 7 没有。

我目前的实现

这是我目前拥有的。首先是一些导入和IO 包装器:

import java.io.{ BufferedReader, File, FileReader }
import scalaz._, Scalaz._, effect.IO, iteratee.{ Iteratee => I, _ }

def openFile(f: File) = IO(new BufferedReader(new FileReader(f)))
def readLine(r: BufferedReader) = IO(Option(r.readLine))
def closeReader(r: BufferedReader) = IO(r.close())

还有一个类型别名来清理一下:

type ErrorOr[A] = Either[Throwable, A]

现在是 tryIO 助手,模仿(松散地,可能是错误地)enumerator 中的助手:

def tryIO[A, B](action: IO[B]) = I.iterateeT[A, IO, ErrorOr[B]](
  action.catchLeft.map(
    r => I.sdone(r, r.fold(_ => I.eofInput, _ => I.emptyInput))
  )
)

BufferedReader 本身的枚举器:

def enumBuffered(r: => BufferedReader) = new EnumeratorT[ErrorOr[String], IO] {
  lazy val reader = r
  def apply[A] = (s: StepT[ErrorOr[String], IO, A]) => s.mapCont(k =>
    tryIO(readLine(reader)) flatMap {
      case Right(None)       => s.pointI
      case Right(Some(line)) => k(I.elInput(Right(line))) >>== apply[A]
      case Left(e)           => k(I.elInput(Left(e)))
    }
  )
}

最后是一个负责打开和关闭阅读器的枚举器:

def enumFile(f: File) = new EnumeratorT[ErrorOr[String], IO] {
  def apply[A] = (s: StepT[ErrorOr[String], IO, A]) => s.mapCont(k =>
    tryIO(openFile(f)) flatMap {
      case Right(reader) => I.iterateeT(
        enumBuffered(reader).apply(s).value.ensuring(closeReader(reader))
      )
      case Left(e) => k(I.elInput(Left(e)))
    }
  )
}

现在假设我想将文件中包含至少 25 个'0' 字符的所有行收集到一个列表中。我会写:

val action: IO[ErrorOr[List[String]]] = (
  I.consume[ErrorOr[String], IO, List] %=
  I.filter(_.fold(_ => true, _.count(_ == '0') >= 25)) &=
  enumFile(new File("big.txt"))
).run.map(_.sequence)

在许多方面,这似乎工作得很好:我可以使用unsafePerformIO 开始操作,它会在几分钟内将数千万行和千兆字节的数据分块,存储在恒定的内存中,并且不会破坏堆栈,然后在完成后关闭阅读器。如果我给它一个不存在的文件的名称,它会尽职尽责地将包含在 Left 中的异常返回给我,并且如果在读取时遇到异常,enumBuffered 至少似乎表现得适当。

潜在问题

不过,我对自己的实现有些担忧——尤其是tryIO。例如,假设我尝试编写几个迭代器:

val it = for {
  _ <- tryIO[Unit, Unit](IO(println("a")))
  _ <- tryIO[Unit, Unit](IO(throw new Exception("!")))
  r <- tryIO[Unit, Unit](IO(println("b")))
} yield r

如果我运行它,我会得到以下信息:

scala> it.run.unsafePerformIO()
a
b
res11: ErrorOr[Unit] = Right(())

如果我在 GHCi 中使用 enumerator 尝试相同的操作,结果会更符合我的预期:

...> run $ tryIO (putStrLn "a") >> tryIO (error "!") >> tryIO (putStrLn "b")
a
Left !

我只是看不到在 iteratee 库本身中没有错误状态的情况下获得此行为的方法。

我的问题

我并不声称自己是迭代器方面的专家,但我在几个项目中使用了各种 Haskell 实现,感觉我或多或少地了解了基本概念,并且曾与 Oleg 喝过咖啡。不过,我在这里不知所措。这是在没有错误状态的情况下处理异常的合理方法吗?有没有办法实现tryIO,它的行为更像enumerator 版本?由于我的实现行为不同,是否有某种定时炸弹在等着我?

【问题讨论】:

  • 无论如何我不会给你一个好的答案,但我很好奇:你的性能和/或抽象目标是什么?大概你关心一些性能,或者Reader 会很好;并且大概您关心一些抽象,或者您会放弃这种开销并使用更紧凑且可能性能更好的策略(对于简单且不需要组合的情况)。例如,Try(closing(io.Source.fromFile("big.txt")){_.getLines.filter(_.count(_=='0') &gt;= 25)}.toList)closing 具有明显的三行定义。
  • @RexKerr:Scalaz 附带的Reader 枚举器的性能非常糟糕——实际上是我在这里实现的枚举器的几十倍。但是我很高兴能够为例如flatMap(enumFile) 列出目录中的文件并获取所有这些文件的行的枚举器的枚举器提供便利,而无需担心显式关闭任何资源(iteratee 方法允许)。
  • 如果您可以为当前示例提供一个完整的工作要点,我可以尝试将其转换为我在答案中写的内容。
  • @IvanMeredith:谢谢!这是the gist,我已经针对scalaz-seven 分支的当前负责人进行了测试。
  • 记录在此my solution

标签: scala haskell io scalaz iterate


【解决方案1】:

在这里编辑是真正的解决方案。我离开了原来的帖子,因为我认为它值得看到这种模式。适用于 Klesli 的方法适用于 IterateeT

import java.io.{ BufferedReader, File, FileReader }
import scalaz._, Scalaz._, effect._, iteratee.{ Iteratee => I, _ }

object IterateeIOExample {
  type ErrorOr[+A] = EitherT[IO, Throwable, A]

  def openFile(f: File) = IO(new BufferedReader(new FileReader(f)))
  def readLine(r: BufferedReader) = IO(Option(r.readLine))
  def closeReader(r: BufferedReader) = IO(r.close())

  def tryIO[A, B](action: IO[B]) = I.iterateeT[A, ErrorOr, B] {
    EitherT.fromEither(action.catchLeft).map(r => I.sdone(r, I.emptyInput))
  }

  def enumBuffered(r: => BufferedReader) = new EnumeratorT[String, ErrorOr] {
    lazy val reader = r
    def apply[A] = (s: StepT[String, ErrorOr, A]) => s.mapCont(k =>
      tryIO(readLine(reader)) flatMap {
        case None => s.pointI
        case Some(line) => k(I.elInput(line)) >>== apply[A]
      })
  }

  def enumFile(f: File) = new EnumeratorT[String, ErrorOr] {
    def apply[A] = (s: StepT[String, ErrorOr, A]) => 
      tryIO(openFile(f)).flatMap(reader => I.iterateeT[String, ErrorOr, A](
        EitherT(
          enumBuffered(reader).apply(s).value.run.ensuring(closeReader(reader)))))
  }

  def main(args: Array[String]) {
    val action = (
      I.consume[String, ErrorOr, List] %=
      I.filter(a => a.count(_ == '0') >= 25) &=
      enumFile(new File(args(0)))).run.run

    println(action.unsafePerformIO().map(_.size))
  }
}

===== 原帖=====

我觉得你需要一个 EitherT。如果没有 EitherT,您最终只会得到 3 个左或右。使用 EitherT 它将对左进行适当的处​​理。

我认为你真正想要的是

type ErrorOr[+A] = EitherT[IO, Throwable, A] 
I.iterateeT[A, ErrorOr, B]

以下代码模拟了您当前的组合方式。因为IterateeT没有左右的概念,所以当你组合它的时候,你只会得到一堆IO/Id。

scala> Kleisli((a:Int) => 4.right[String].point[Id])
res11: scalaz.Kleisli[scalaz.Scalaz.Id,Int,scalaz.\/[String,Int]] = scalaz.KleisliFunctions$$anon$18@73e771ca

scala> Kleisli((a:Int) => "aa".left[Int].point[Id])
res12: scalaz.Kleisli[scalaz.Scalaz.Id,Int,scalaz.\/[String,Int]] = scalaz.KleisliFunctions$$anon$18@be41b41

scala> for { a <- res11; b <- res12 } yield (a,b)
res15: scalaz.Kleisli[scalaz.Scalaz.Id,Int,(scalaz.\/[String,Int], scalaz.\/[String,Int])] = scalaz.KleisliFunctions$$anon$18@42fd1445

scala> res15.run(1)
res16: (scalaz.\/[String,Int], scalaz.\/[String,Int]) = (\/-(4),-\/(aa))

在下面的代码中,我们没有使用 Id,而是使用了 EitherT。由于 EitherT 与 Either 具有相同的绑定行为,因此我们最终得到了我们想要的。

scala>  type ErrorOr[+A] = EitherT[Id, String, A]
defined type alias ErrorOr

scala> Kleisli[ErrorOr, Int, Int]((a:Int) => EitherT(4.right[String].point[Id]))
res22: scalaz.Kleisli[ErrorOr,Int,Int] = scalaz.KleisliFunctions$$anon$18@58b547a0

scala> Kleisli[ErrorOr, Int, Int]((a:Int) => EitherT("aa".left[Int].point[Id]))
res24: scalaz.Kleisli[ErrorOr,Int,Int] = scalaz.KleisliFunctions$$anon$18@342f2ceb

scala> for { a <- res22; b <- res24 } yield 2
res25: scalaz.Kleisli[ErrorOr,Int,Int] = scalaz.KleisliFunctions$$anon$18@204eab31

scala> res25.run(2).run
res26: scalaz.Scalaz.Id[scalaz.\/[String,Int]] = -\/(aa)

您可以将 Keisli 替换为 IterateeT 并将 Id 替换为 IO 以获得您需要的内容。

【讨论】:

  • 谢谢——我今天早上在这里短暂地使用了EitherT,但无法使类型正确,但我现在又试了一次。
  • 我现在也在转换类型。
【解决方案2】:

pipes 的做法是使用 Channel 类型类进行类型组合:

class Channel p where
    {-| 'idT' acts like a \'T\'ransparent proxy, passing all requests further
        upstream, and passing all responses further downstream. -}
    idT :: (Monad m) => a' -> p a' a a' a m r

    {-| Compose two proxies, satisfying all requests from downstream with
        responses from upstream. -}
    (>->) :: (Monad m)
          => (b' -> p a' a b' b m r)
          -> (c' -> p b' b c' c m r)
          -> (c' -> p a' a c' c m r)
    p1 >-> p2 = p2 <-< p1

... 并从基础组合中推导出 EitherT 上的提升组合。这是pipes-2.4 中介绍的代理转换器原理的一个特例,它允许在任意扩展上提升组合。

这种提升需要在Control.Proxy.Trans.Either 中定义一个专用于Proxy 形状的EitherT

newtype EitherP e p a' a b' b (m :: * -> *) r
  = EitherP { runEitherP :: p a' a b' b m (Either e r) }

这种对Proxy 形状的特化是必要的,以便能够定义Channel 类的类型良好的实例。 Scala 在这方面可能比 Haskell 更灵活。

然后,我只需重新定义 Monad 实例(和其他实例)以及此专用类型的所有普通 EitherT 操作:

throw :: (Monad (p a' a b' b m)) => e -> EitherP e p a' a b' b m r
throw = EitherP . return . Left

catch
 :: (Monad (p a' a b' b m))
 => EitherP e p a' a b' b m r        -- ^ Original computation
 -> (e -> EitherP f p a' a b' b m r) -- ^ Handler
 -> EitherP f p a' a b' b m r        -- ^ Handled computation
catch m f = EitherP $ do
    e <- runEitherP m
    runEitherP $ case e of
        Left  l -> f     l
        Right r -> right r

有了这个,我可以定义以下提升的合成实例:

-- Given that 'p' is composable, so is 'EitherP e p'
instance (Channel p) => Channel (EitherP e p) where
    idT = EitherP . idT
    p1 >-> p2 = (EitherP .) $ runEitherP . p1 >-> runEitherP . p2

要了解那里发生了什么,只需遵循类型:

p1 :: b' -> EitherP e p a' a b' b m r
p2 :: c' -> EitherP e p b' b c' c m r

runEitherP . p1 :: b' -> p a' a b' b m (Either e r)
runEitherP . p2 :: c' -> p b' b c' c m (Either e r)

-- Use the base composition for 'p'
runEitherP . p1 >-> runEitherP . p2
 :: c' -> p a' a c' c m (Either e r)

-- Rewrap in EitherP
(EitherP . ) $ runEitherP . p1 >-> runEitherP . p2
 :: c' -> EitherP e p a' a c' c m r

这使您可以在特定阶段内抛出和捕获错误,而不会中断其他阶段。这是我从pipes-2.4 公告帖子中复制并粘贴的示例:

import Control.Monad (forever)
import Control.Monad.Trans (lift)
import Control.Proxy
import Control.Proxy.Trans.Either as E
import Safe (readMay)

promptInts :: () -> EitherP String Proxy C () () Int IO r
promptInts () = recover $ forever $ do
    str <- lift getLine
    case readMay str of
        Nothing -> E.throw "Could not parse an integer"
        Just n  -> liftP $ respond n

recover p =
    p `E.catch` (\str -> lift (putStrLn str) >> recover p)

main = runProxy $ runEitherK $ mapP printD <-< promptInts

结果如下:

>>> main
1<Enter>
1
Test<Enter>
Could not parse an integer
Apple<Enter>
Could not parse an integer
5<Enter>
5

迭代方法的答案是相似的。您必须采用现有的方式来编写迭代器并将其提升到EitherT。是使用类型类还是仅仅定义一个新的组合操作符取决于你。

其他一些有用的链接:

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2018-05-03
    • 2011-10-05
    • 1970-01-01
    • 1970-01-01
    • 2021-01-10
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多