【问题标题】:Scala's Future and ExecutionContext ExecutionScala 的 Future 和 ExecutionContext 执行
【发布时间】:2016-05-11 18:25:01
【问题描述】:

假设我有以下一组在 Future 中执行某些操作的代码:

1 to 10 foreach {
  case x => Future { x + x }
}

假设我给这段代码默认的ExecutionContext,我知道后台发生了什么,但我想知道的是Future的处理到底是怎么做的?我的意思是应该有一些线程或一组线程可能正在等待 Future 完成?这些线程被阻塞了吗?从某种意义上说,他们正在等待 Future 完成?

现在在以下场景中:

val x: Future[MyType] = finishInSomeFuture()

假设 x 有一个超时,我可以这样调用:

Future {
  blocking {
    x.get(3, TimeOut.SECONDS)
  }
} 

我真的在阻止吗?有没有更好的异步超时方法?

编辑:以下 Timeout 与我上面定义的阻塞上下文相比有何不同或更好?

object TimeoutFuture {
  def apply[A](timeout: FiniteDuration)(block: => A): Future[A] = {

    val prom = promise[A]

    // timeout logic
    Akka.system.scheduler.scheduleOnce(timeout) {
      prom tryFailure new java.util.concurrent.TimeoutException
    }

    // business logic
    Future { 
      prom success block
    }

    prom.future
  } 
}

【问题讨论】:

  • ExecutionContext 使用具有窃取语义的 ForkJoinPool。阅读:blog.jessitron.com/2014/02/…
  • @Mika'il 假设您使用的是默认执行上下文并且不提供自己的池
  • x.get 方法是什么?你用的是scala内置的Futures吗?
  • 假设我有一种方法,比如在给定的持续时间内获取那种块。我想知道如何在不阻塞的情况下超时?

标签: scala future


【解决方案1】:
  1. 假设我有以下一组在 Future 中执行某些操作的代码:

    1 to 10 foreach {
      case x => Future { x + x }
    }
    

    ...

您的代码创建了十个Futures,这些Futures 立即设置为使用隐式ExecutionContext 提供的线程执行。由于您不存储对期货的引用,也不等待它们的执行,因此您的主线程(定义了 foreach 的地方)不会阻塞并立即继续执行。如果那段代码在main 方法的末尾,那么,取决于ThreadFactory 中的ExecutionContext 是否产生了daemon,线程程序可能会在不等待Futures 完成的情况下退出。

  1. 现在在以下场景中:

    val x: Future[MyType] = finishInSomeFuture()
    

    假设 x 有一个超时,我可以这样调用:

    Future {
      blocking {
        x.get(3, TimeOut.SECONDS)
      }
    } 
    

    我真的在阻止吗?有没有更好的异步超时方法?

你可能指的是Await.result而不是x.get

def inefficientTimeoutFuture[T](f:Future[T], x:Duration) = Future { Await.result(f, x) }

在这种情况下,未来的f将在单独的线程中计算,而额外的线程将被阻塞等待f的计算。

Using scheduler to create TimeoutFuture 效率更高,因为调度程序通常共享固定数量的线程(通常是一个),而 Await.result 中的阻塞将始终需要额外的线程来阻塞。

  1. 我想知道如何在不阻塞的情况下超时?

使用调度程序创建 TimeoutFuture 允许您在不阻塞的情况下超时操作。您将 Future 包装在超时助手中,并且新的 Future 要么成功完成,要么由于超时而失败(以先到者为准)。新的 Future 具有相同的异步特性,由您决定如何使用它(注册 onComplete 回调或同步等待结果,阻塞主线程)。


UPD 我将尝试澄清一些关于多线程和阻塞的基本知识。

目前异步非阻塞方法是趋势,但您必须了解阻塞意味着什么以及为什么应该避免它。

Java 中的每个线程都是有代价的。首先,创建新线程相对昂贵(这就是存在线程池的原因),其次,它会消耗内存。为什么不是CPU?因为您的 CPU 资源受到您拥有的核心数量的限制。不管你有多少活动线程,你的并行度总​​是会受到核心数量的限制。如果线程处于非活动状态(阻塞),它不会消耗 CPU。

在现代 Java 应用程序中,您可以创建相当多的线程(数千个)。问题在于,在某些情况下,您无法预测需要多少线程。这就是异步方法发挥作用的时候。它说:与其在其他线程完成工作时阻塞当前线程,不如在回调中包装我们的下一步并将当前线程返回到池中,以便它可以做一些其他有用的工作。所以几乎所有的线程都在忙着做实际的工作,而不仅仅是等待和消耗内存。

现在来看计时器的例子。如果您使用基于网络的HashedWheelTimer,您可以让它由单线程支持并安排数千个事件。当您创建被阻塞等待超时的Future 时,您每个“计划”占用一个线程。因此,如果您安排了数千个超时,您最终将获得数千个阻塞线程(这再次消耗内存,而不是 cpu)。

现在你的“主要”未来(你想在超时中包装)也不必阻塞线程。例如,如果你在future内部执行同步http请求,你的线程会被阻塞,但是如果你使用基于netty的AsyncHttpClient(例如),你可以使用一个不占用线程的基于promise的future。在这种情况下,您可以拥有少量固定数量的线程来处理任意数量的请求(数十万个)。


UPD2

但是即使在 Timer 的情况下,也应该有一些线程应该阻塞,因为它必须等待 Timeout 毫秒。那么好处在哪里?我仍然阻止,但在 Timer 情况下我可能会阻止更少还是?

这仅适用于一种特定情况:当您有等待异步任务完成的主线程时。在这种情况下,您是对的,如果不阻塞主线程,就无法在超时中包装操作。在这种情况下使用 Timers 没有任何意义。你只需要额外的线程来执行你的操作,而主线程等待结果或超时。

但通常Futures 用于更复杂的场景,没有“主”线程。例如,假设异步网络服务器,请求进来,你创建 Future 来处理它并注册回调来回复。没有“主”线程等待任何东西。

或者另一个例子,您想向外部服务发出 1000 个请求,并分别超时,然后将所有结果收集到一个地方。如果您有该服务的异步客户端,则创建 1000 个请求,将它们包装在异步超时中,然后组合成一个 Future。您可以阻止主线程等待该未来完成或注册回调以打印结果,但您不必创建 1000 个线程来等待每个单独的请求完成。

所以,重点是:如果您已经有同步流并且您想在超时中包装其中的一部分,您唯一能做的就是让您当前的线程阻塞,直到其他线程执行该工作。 如果你想避免阻塞,你需要从一开始就使用异步方法。

【讨论】:

  • 您能否详细说明 TimeoutFuture 如何帮助避免阻塞?
  • @sparkr,你到底想让我澄清什么?创建 TimeoutFuture 的父线程不会被阻塞,除非您选择,否则调度程序线程是共享的,因此不重要,执行未来计算的执行线程很忙,但这是不可避免的,因为必须以某种方式执行计算。
  • 我的意思是我会通过使用 TimeoutFuture 获得什么?我仍然必须有一个等待超时的线程,不是吗?我看到Scheduler线程是共享的,所以你的意思是说阻塞在共享线程上比阻塞在执行线程上更好?
  • 感谢您的回复!但我还是不明白这句话是如何工作的“使用调度程序创建 TimeoutFuture 允许您在不阻塞的情况下超时操作”
  • @sparkr,“超时”是什么意思?也许我们在谈论不同的事情。请描述你的场景。澄清一下,两种方法在语义上是相同的,但其中一种方法需要每个请求额外的线程,而第二种方法可以使用少量固定数量的线程。
猜你喜欢
  • 1970-01-01
  • 2019-11-16
  • 2020-08-16
  • 1970-01-01
  • 2016-08-26
  • 1970-01-01
  • 1970-01-01
  • 2021-10-27
  • 1970-01-01
相关资源
最近更新 更多