【问题标题】:Exception thrown while iterating in scala在scala中迭代时抛出异常
【发布时间】:2013-06-09 18:13:55
【问题描述】:

我有一个基于 Actor 的系统,在该系统中,我正在读取位于 S3 存储桶中的外部文件,并移动获取每个文件行并将其发送给另一个处理该特定行的 Actor。我很难理解的是在读取文件时抛出异常时会发生什么。

我的代码如下:

import akka.actor._
import akka.actor.ActorSystem

class FileWorker(processorWorker: ActorRef) extends Actor with ActorLogging {

  val fileUtils = new S3Utils()

  private def processFile(fileLocation: String): Unit = {
    try{
         fileUtils.getLinesFromLocation(fileLocation).foreach {
         r =>
        {
           //Some processing happens for the line
            }
          }
        }
    }
    }catch{
      case e:Exception => log.error("Issue processing files from the following location %s".format(fileLocation))
    }
  }

  def receive() = {
    case fileLocation: String => {
      processFile(fileLocation)
    }
  }
}

在我的S3Utils 类中,我定义了getLinesFromLocation 方法如下:

 def getLinesFromLocation(fileLocation: String): Iterator[String] = {
    try{
       for {
             fileEntry <- getFileInfo(root,fileLocation)
          } yield fileEntry
    }catch{
      case e:Exception => logger.error("Issue with file location %s:         %s".format(fileLocation,e.getStackTraceString));throw e
    }
  }

我实际读取文件的方法在私有方法getFileInfo中定义

 private def getFileInfo(rootBucket: String,fileLocation: String): Iterator[String] = {
    implicit val codec = Codec(Codec.UTF8)
    codec.onMalformedInput(CodingErrorAction.IGNORE)
    codec.onUnmappableCharacter(CodingErrorAction.IGNORE)
    Source.fromInputStream(s3Client.
                       getObject(rootBucket,fileLocation).
                       getObjectContent()).getLines
  }

我写了上面的文章,假设位于 S3 上的底层文件不会被缓存在任何地方,我将简单地遍历恒定空间中的各个行并处理它们。如果在读取特定行时出现问题,迭代器将继续前进,而不会影响 Actor。

我的第一个问题是,我对迭代器的理解是否正确?实际上,我是否真的从底层文件系统(在本例中为 S3 存储桶)读取行而不对内存施加任何压力/或引入任何内存泄漏。

下一个问题是,如果迭代器在读取单个条目时遇到错误,是终止整个迭代过程还是继续下一个条目。

我的最后一个问题是,我的文件处理逻辑写对了吗?

很高兴能对此有所了解。

谢谢

【问题讨论】:

  • 堆栈跟踪和异常类型 + 消息可能有助于人们了解这里发生了什么。

标签: scala file-io exception-handling iterator actor


【解决方案1】:

看起来亚马逊 s3 没有异步实现,我们被固定的演员卡住了。所以你的实现是正确的,只要你为每个连接分配一个线程并且不会阻塞输入,也不会使用太多的连接。

需要采取的重要步骤:

1) processFile 不应阻塞当前线程。最好它应该将其输入委托给另一个参与者:

 private def processFile(fileLocation: String): Unit = {
     ...
         fileUtils.getLinesFromLocation(fileLocation).foreach {  r =>
            lineWorker ! FileLine(fileLocation, r)
         }

    ...
 }

2) 将FileWorker 设为固定演员:

## in application.config:
my-pinned-dispatcher {
    executor = "thread-pool-executor"
    type = PinnedDispatcher
}

// in the code:  
val fileWorker = context.actorOf(Props(classOf[FileWorker], lineWorker).withDispatcher("my-pinned-dispatcher"), "FileWorker")

如果迭代器在读取单个条目时遇到错误,是否会终止整个迭代过程?

是的,你的整个进程将被终止,演员将从它的邮箱中获取下一个作业。

【讨论】:

  • 谢谢。编写 getLinesFromLocation 是为了在这种特殊情况下将当前行更改为不同的形状,即表示域模型的案例类。阅读您的帖子后,我对当前的设计更加困惑。我的设计目标是扫描整个文件而不将其存储在内存中,并在读取文件时处理各个行条目,我认为 Source.fromInputStream(...).getlines 可以实现。但在这方面我的理解似乎是错误的。有没有办法实现我刚才描述的?
  • 或者不,这只是我愚蠢。迭代器上的映射将返回您的迭代器,您的代码确实会使用很少的内存。对不起!我会尝试在这里放一些异步草图来弥补这一点。
  • 那太好了。非常感谢。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2011-07-01
  • 1970-01-01
  • 2023-03-07
  • 2016-02-21
  • 2013-05-24
  • 1970-01-01
  • 2018-10-18
相关资源
最近更新 更多