【问题标题】:How to schedule scala Future from persist quartz job如何从坚持石英作业安排scala Future
【发布时间】:2020-02-10 20:41:01
【问题描述】:

假设,有一些 scala 代码应该在java quartz library 的帮助下安排。 我们需要将此代码执行的结果存储在作业上下文中,以便在下一次作业执行中访问此结果。 例如,有一些CounterService 有一个inc 函数,应该被调度:

trait CounterService {
  def inc(): Int
}

以下quartz-job成功调用inc并将其结果存储在JobDataMap中:

@PersistJobDataAfterExecution
@DisallowConcurrentExecution
class CounterJob extends Job {
  val counterService: CounterService = ...

  override def execute(context: JobExecutionContext): Unit = {

    val newCounterValue: Int = counterService.inc()

    val map = context.getJobDetail.getJobDataMap
    map.put("counter", newCounterValue)  
  }
}

我们可以随时在其他地方获得工作结果(如果我们有参考scheduler):

val scheduler: Scheduler = ...
// gets details of our CounterJob which was created and registered in the scheduler
// by the name "counter-job" (it is not shown in our example)
val job = scheduler.getJobDetail(JobKey.jobKey("counter-job")) 
// this map will contain the job result which was stored by the key "counter"
val map = job.getJobDataMap.asScala 

但是如果我们想从quartz-job 执行async 代码,这个方法就不起作用了。 例如,假设我们的柜台服务如下:

trait AsyncCounterService {
  def asyncInc(): Future[Int]
}

我们可以尝试通过以下方式来实现我们的工作。但它不能正常工作,因为 方法CounterJob.execute 可以早于asyncCounterService.asyncInc 执行。 而且我们无法将asyncInc 的结果存储在JobDataMap 中:

@PersistJobDataAfterExecution
@DisallowConcurrentExecution
class CounterJob extends Job {
  val counterService: AsyncCounterService = ...  
  val execContext: ExecutionContext = ...

  override def execute(context: JobExecutionContext): Unit = {

    // # 1: we can not influence on the execution flow of this future 
    //      from job scheduler.
    val counterFuture: Future[Int] = counterService.asyncInc() 

    counterFuture.map { counterValue: Int =>

      val map = context.getJobDetail.getJobDataMap  
      // #2: this action won't have any effect
      map.put("counter", counterValue)              
    }
  }
}

以上代码中标记为#1 ...#2 ... cmets的这个解决方案至少有两个问题。

有没有更好的做法来解决这个问题? 换句话说,如何从持久石英作业中调度 scala Future,并将 Future's 结果存储在 JobDetailData 映射中?

【问题讨论】:

    标签: java scala quartz-scheduler


    【解决方案1】:

    如果 CounterJob 之后的所有内容都需要具有 counterService 值,则可以在 CounterJob 中阻塞并等待 Future。无论如何,在那段时间内什么都不能执行,因为值还没有计算出来。

    import scala.concurrent.{Await,Future}
    ...
    
     try {
          val counterValue  = Await.result(counterFuture, 5.seconds)
          map.put("counter", counterValue)       
        } catch {
          case t: TimeoutException => ...
          case t: Exception => ...
       }
    
    

    如果您在该作业中有多个异步期货,您可以将它们与 flatMap, map 操作的单子链和 for comprehension 或来自 Future 伴随对象的静态辅助方法组合,例如 Future.sequence 那么 endresult 将是一个结合所有异步操作的未来,您可以通过Await 等待。

    通常,等待期货被认为是一种不好的做法。因为这会阻止执行器线程在等待未来完成时执行任何其他操作。

    但是,在这里,您将另一个作业调度框架与另一个并发范式混合在一起。如上所述,在特定示例中,阻塞是可以的,因为后面的一切都依赖于第一次计算。

    如果其他作业可以同时运行,将有多种解决方法:

    1. 有一种方法可以从工作中恢复未来。 然后你可以等待这个未来完成,然后再调度依赖 工作。
    2. 作业中有某种自定义事件侦听器机制,可以从作业中触发。 counterFuture.map {context.notify("computationReady")}
    3. 有特定的 AsyncJob 支持非阻塞 io,它期望 java Future 作为返回。然后你可以将 Scala Future 转换为 Java Future

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2010-12-13
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-09-08
      相关资源
      最近更新 更多