【问题标题】:Future called in a while loop is not called every time在 while 循环中调用的 Future 不会每次都被调用
【发布时间】:2018-06-03 13:19:44
【问题描述】:

我正在构建一个实用程序,该实用程序可以监视并保留在更大系统中正在处理的文件的进度。该文件是一个大的“文本”文件,.csv、.xls、.txt 等。这可能是来自 Kafka 的流数据,将其写入 Avro,或者将其批量写入 SQL DB。我正在尝试构建一个“catchall”实用程序,它记录处理的行数并使用 RESTful API 调用将进度保存到数据库。

无论处理类型如何,处理总是在 Akka Actor 内完成。我正在尝试异步记录进度,以避免阻塞处理进度。进展非常迅速。大部分都以类似的批处理格式发生,尽管有时它会一个一个地递增,这里是处理过程中会发生什么的基本表示,仅用于演示:

//inside my processing actor

  var fileIsProcessing = true
  val allLines = KafkaUtil.getConnect(fileKey)
  val totalLines = KafkaUtil.getSize
  val batchSize = 500
  val dBUtil = new DBUtil(totalLines)

 while (fileIsProcessing) {

    // consumes @ 500 lines at a time to process, returns empty if done consuming
    val batch:List[Pollable] = allLines.poll
    //for batch  identification purposes
    val myMax = batch.map(_.toInt ).max
    println("Starting new batch with max line: " + myMax)

    //processing work happens here
    batch.map(processSync)
    println("Finished processing batch with max line: " + myMax)

    //send a progress update to be persisted to the DB
    val progressCall = Future[Unit] {dBUtil.incrementProgress(batch.size)}
    progressCall.onComplete{
          case Success(s) => // don't care
          case Failure(e) => logger.error("Unable to persist progress from actor ") 
    }

 if (batch.isEmpty) fileIsProcessing = false //this is horribly non-functional.
}

还有,我的 DBUtil 的简单表示,即进行处理的类:

class DBUtil(totalLines:Int) {

    //store both the number processed and the total to process in db, even if there is currently a percentage

var rate = 0 //lines per second
var totalFinished = 0
var percentageFin:Double = 0
var lastUpdate = DateTime.now()

def incrementProgress(totalProcessed: Int, currentTime:DateTime): Unit = {
  //simulate write the data and calculated progress percentage to db
  rate = totalProcessed/((currentTime.getMillis() - lastUpdate.getMillis())/1000)
  totalFinished += totalProcessed
  percentageFin = (totalFinished.toDouble / totalLines.toDouble) * 100
  println(s"Simulating DB persist of total processed:$totalFinished lines at $percentageFin% from my total lines: $totalLines at rate:$rate" )
}

}

现在,真正奇怪的是,在生产中,处理过程发生得如此之快,以至于Future[Unit] { dBUtil.incrementProgress(batch.size)} 行并不是每次都能可靠地调用。 while 循环将完成,但我会在我的数据库中指出,进度将在 50% 或 80% 时挂起。它起作用的唯一方法是,如果我使用loggerprintln 语句使系统陷入困境以减慢它的速度。

为什么我的 Future 调用每次都不能可靠地调用?

【问题讨论】:

  • 您在DBUtil 中显示没有任何同步的伪代码。很容易想象你在那里抛出了一个异常并且从未注意到。
  • 我肯定会检查错误,有Trys在适当的地方使用,Future的经常使用.onComplete{ case Success => ... case Failure => ...}`出于抽象的目的,上面的伪代码被尽可能地剥离。我只是想知道 Future 调用是否会因任何原因被“跳过”。

标签: scala asynchronous future


【解决方案1】:

嗯...所以您的代码几乎没有问题,

您只是在 while 循环中启动期货,然后您的循环进入下一次迭代,而无需等待未来完成。这意味着您的程序可能在执行者实际执行期货之前完成。

此外,您的循环正在创建对dBUtil.incrementProgress(batch.size) 的越来越多的“未来主义”调用,您将有多个线程同时执行相同的函数。当您使用可变状态时,这将导致竞争条件。

def processFileWithIncrementalUpdates(
  allLines: ????,
  totalLines: Int,
  batchSize: Int,
  dbUtil: DBUtil
): Future[Unit] = {
  val promise = Promise[Unit]()
  Future {
    val batch: List[Pollable] = allLines.poll
    if (batch.isEmpty) {
      promise.completeWith(Future.successful[Unit]())
    }
    else {
      val myMax = batch.map(_.toInt).max
      println("Starting new batch with max line: " + myMax)

      //processing work happens here
      batch.map(processSync)
      println("Finished processing batch with max line: " + myMax)

      //send a progress update to be persisted to the DB
      val progressCall = Future[Unit] { dBUtil.incrementProgress(batch.size) }

      progressCall.onComplete{
        case Success(s) => // don't care
        case Failure(e) => logger.error("Unable to persist progress from actor ")
      }

      progressCall.onComplete({
        case _ => promise.completeWith(processFileWithIncrementalUpdates(allLines, totalLines, batchSize, dBUtil))
      })
    }
    promise.future
  }
}

val allLines = KafkaUtil.getConnect(fileKey)
val totalLines = KafkaUtil.getSize
val batchSize = 500
val dBUtil = new DBUtil(totalLines)

val processingFuture = processFileWithIncrementalUpdates(allLines, totalLines, batchSize, dBUtil)

【讨论】:

  • >您只是在 while 循环中启动期货,然后您的循环 > 进行下一次迭代,而无需等待未来完成。这 > 意味着您的程序可能在期货实际被执行者执行之前完成。这正是我想要的——但是对 Future 的调用真的会“丢失”吗?我认为所有循环都会完成,但然后回去尝试“赶上”所有未来的电话。我真的提倡在 Actor 中进行调用,因为“一劳永逸”和队列保持顺序,但由于开销而被拒绝。
  • 对不起,您的报价格式,编辑速度不够快。
  • 主要问题是你在调用incrementProgress函数的不同线程之间共享了可变状态。
  • 我知道,代码是占位符伪代码。而这一切的状态真的不是我的问题。主要问题是关于期货的生命周期,如果其余代码“完成”,它们真的会被“注销”或被忽略吗?
  • 您对未来“生命周期”的定义取决于它们计划执行的执行上下文的生命周期。因此,当拥有执行上下文的进程退出(完成)时,执行上下文和期货也一样。
猜你喜欢
  • 2013-03-10
  • 2015-10-05
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2013-04-27
  • 1970-01-01
  • 1970-01-01
  • 2017-09-15
相关资源
最近更新 更多