【发布时间】:2013-06-09 18:13:55
【问题描述】:
我有一个基于 Actor 的系统,在该系统中,我正在读取位于 S3 存储桶中的外部文件,并移动获取每个文件行并将其发送给另一个处理该特定行的 Actor。我很难理解的是在读取文件时抛出异常时会发生什么。
我的代码如下:
import akka.actor._
import akka.actor.ActorSystem
class FileWorker(processorWorker: ActorRef) extends Actor with ActorLogging {
val fileUtils = new S3Utils()
private def processFile(fileLocation: String): Unit = {
try{
fileUtils.getLinesFromLocation(fileLocation).foreach {
r =>
{
//Some processing happens for the line
}
}
}
}
}catch{
case e:Exception => log.error("Issue processing files from the following location %s".format(fileLocation))
}
}
def receive() = {
case fileLocation: String => {
processFile(fileLocation)
}
}
}
在我的S3Utils 类中,我定义了getLinesFromLocation 方法如下:
def getLinesFromLocation(fileLocation: String): Iterator[String] = {
try{
for {
fileEntry <- getFileInfo(root,fileLocation)
} yield fileEntry
}catch{
case e:Exception => logger.error("Issue with file location %s: %s".format(fileLocation,e.getStackTraceString));throw e
}
}
我实际读取文件的方法在私有方法getFileInfo中定义
private def getFileInfo(rootBucket: String,fileLocation: String): Iterator[String] = {
implicit val codec = Codec(Codec.UTF8)
codec.onMalformedInput(CodingErrorAction.IGNORE)
codec.onUnmappableCharacter(CodingErrorAction.IGNORE)
Source.fromInputStream(s3Client.
getObject(rootBucket,fileLocation).
getObjectContent()).getLines
}
我写了上面的文章,假设位于 S3 上的底层文件不会被缓存在任何地方,我将简单地遍历恒定空间中的各个行并处理它们。如果在读取特定行时出现问题,迭代器将继续前进,而不会影响 Actor。
我的第一个问题是,我对迭代器的理解是否正确?实际上,我是否真的从底层文件系统(在本例中为 S3 存储桶)读取行而不对内存施加任何压力/或引入任何内存泄漏。
下一个问题是,如果迭代器在读取单个条目时遇到错误,是终止整个迭代过程还是继续下一个条目。
我的最后一个问题是,我的文件处理逻辑写对了吗?
很高兴能对此有所了解。
谢谢
【问题讨论】:
-
堆栈跟踪和异常类型 + 消息可能有助于人们了解这里发生了什么。
标签: scala file-io exception-handling iterator actor