【发布时间】:2018-06-03 13:19:44
【问题描述】:
我正在构建一个实用程序,该实用程序可以监视并保留在更大系统中正在处理的文件的进度。该文件是一个大的“文本”文件,.csv、.xls、.txt 等。这可能是来自 Kafka 的流数据,将其写入 Avro,或者将其批量写入 SQL DB。我正在尝试构建一个“catchall”实用程序,它记录处理的行数并使用 RESTful API 调用将进度保存到数据库。
无论处理类型如何,处理总是在 Akka Actor 内完成。我正在尝试异步记录进度,以避免阻塞处理进度。进展非常迅速。大部分都以类似的批处理格式发生,尽管有时它会一个一个地递增,这里是处理过程中会发生什么的基本表示,仅用于演示:
//inside my processing actor
var fileIsProcessing = true
val allLines = KafkaUtil.getConnect(fileKey)
val totalLines = KafkaUtil.getSize
val batchSize = 500
val dBUtil = new DBUtil(totalLines)
while (fileIsProcessing) {
// consumes @ 500 lines at a time to process, returns empty if done consuming
val batch:List[Pollable] = allLines.poll
//for batch identification purposes
val myMax = batch.map(_.toInt ).max
println("Starting new batch with max line: " + myMax)
//processing work happens here
batch.map(processSync)
println("Finished processing batch with max line: " + myMax)
//send a progress update to be persisted to the DB
val progressCall = Future[Unit] {dBUtil.incrementProgress(batch.size)}
progressCall.onComplete{
case Success(s) => // don't care
case Failure(e) => logger.error("Unable to persist progress from actor ")
}
if (batch.isEmpty) fileIsProcessing = false //this is horribly non-functional.
}
还有,我的 DBUtil 的简单表示,即进行处理的类:
class DBUtil(totalLines:Int) {
//store both the number processed and the total to process in db, even if there is currently a percentage
var rate = 0 //lines per second
var totalFinished = 0
var percentageFin:Double = 0
var lastUpdate = DateTime.now()
def incrementProgress(totalProcessed: Int, currentTime:DateTime): Unit = {
//simulate write the data and calculated progress percentage to db
rate = totalProcessed/((currentTime.getMillis() - lastUpdate.getMillis())/1000)
totalFinished += totalProcessed
percentageFin = (totalFinished.toDouble / totalLines.toDouble) * 100
println(s"Simulating DB persist of total processed:$totalFinished lines at $percentageFin% from my total lines: $totalLines at rate:$rate" )
}
}
现在,真正奇怪的是,在生产中,处理过程发生得如此之快,以至于Future[Unit] { dBUtil.incrementProgress(batch.size)} 行并不是每次都能可靠地调用。 while 循环将完成,但我会在我的数据库中指出,进度将在 50% 或 80% 时挂起。它起作用的唯一方法是,如果我使用logger 或println 语句使系统陷入困境以减慢它的速度。
为什么我的 Future 调用每次都不能可靠地调用?
【问题讨论】:
-
您在
DBUtil中显示没有任何同步的伪代码。很容易想象你在那里抛出了一个异常并且从未注意到。 -
我肯定会检查错误,有
Trys在适当的地方使用,Future的经常使用.onComplete{ case Success => ... case Failure => ...}`出于抽象的目的,上面的伪代码被尽可能地剥离。我只是想知道 Future 调用是否会因任何原因被“跳过”。
标签: scala asynchronous future