【问题标题】:How do you perform blocking IO in apache spark job?您如何在 apache spark 作业中执行阻塞 IO?
【发布时间】:2014-09-22 23:02:30
【问题描述】:

如果我在遍历 RDD 时,需要通过调用外部(阻塞)服务来计算数据集中的值怎么办?您认为如何实现?

值:Future[RDD[Double]] = Future sequence tasks

我尝试创建一个 Futures 列表,但由于 RDD id not Traversable,Future.sequence 不适合。

我只是想知道,是否有人遇到过这样的问题,您是如何解决的? 我想要实现的是在单个工作节点上获得并行性,因此我可以每 调用该外部服务 3000 次。

可能还有另一种更适合spark的解决方案,比如在单个主机上拥有多个工作节点。

有趣的是,您如何应对这样的挑战?谢谢。

【问题讨论】:

  • 你需要计算什么样的值?是否可以离线计算并加入您的数据集?或者远程代码可以作为 jar 拉入并在进程中计算?
  • 该值是通过比较提供的输入和 RDD 中的每个项目计算得出的。所以我遍历RDD并比较每个元素。 Comarisson 是一个阻塞调用,因为它隐藏在本机组件中。这就是为什么我想知道,你将如何做到这一点,如果你有这个挑战?非常感谢您的帮助。

标签: scala parallel-processing apache-spark


【解决方案1】:

这是我自己的问题的答案:

val buckets = sc.textFile(logFile, 100)
val tasks: RDD[Future[Object]] = buckets map { item =>
  future {
    // call native code
  }
}

val values = tasks.mapPartitions[Object] { f: Iterator[Future[Object]] =>
  val searchFuture: Future[Iterator[Object]] = Future sequence f
  Await result (searchFuture, JOB_TIMEOUT)
}

这里的想法是,我们得到分区的集合,其中每个分区被发送到特定的工作人员并且是最小的工作。每一项工作都包含数据,可以通过调用本机代码并发送该数据来处理这些数据。

'values' 集合包含从本机代码返回的数据,并且该工作在整个集群中完成。

【讨论】:

  • 我们现在面临同样的问题。你有一个如何使用任务的例子吗?谢谢,
  • spark 是否会自动为您的 future 运行提供隐式 ExecutionContext?
  • 要在@advait 问题上进行开发,在这种情况下了解 spark 如何管理并发会非常有趣
【解决方案2】:

根据您的回答,阻塞调用是将提供的输入与 RDD 中的每个单独项目进行比较,我强烈考虑在 java/scala 中重写比较,以便它可以作为 Spark 进程的一部分运行。如果比较是一个“纯”函数(没有副作用,仅取决于其输入),它应该可以直接重新实现,并且由于不必进行远程操作,从而降低火花过程的复杂性并提高稳定性电话可能会让它值得。

您的远程服务似乎不太可能每秒处理 3000 次调用,因此最好使用本地进程内版本。

如果由于某种原因这绝对不可能,那么您也许可以创建一个 RDD 转换,以伪代码将您的数据转换为期货的 RDD:

val callRemote(data:Data):Future[Double] = ...

val inputData:RDD[Data] = ...

val transformed:RDD[Future[Double]] = inputData.map(callRemote)

然后从那里继续,计算你的 Future[Double] 对象。

如果您知道远程进程可以处理多少并行度,最好放弃 Future 模式并接受它是一个瓶颈资源。

val remoteParallelism:Int = 100 // some constant

val callRemoteBlocking(data:Data):Double = ...

val inputData:RDD[Data] = ...

val transformed:RDD[Double] = inputData.
  coalesce(remoteParallelism).
  map(callRemoteBlocking)

你的工作可能需要相当长的时间,但它不应该淹没你的远程服务并可怕地死掉。

最后一个选择是,如果输入是合理可预测的,并且结果范围是一致的并且限制在一些合理数量的输出(数百万左右),您可以使用远程服务将它们全部预先计算为一个数据集并找到他们在 spark 工作时间使用连接。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2015-09-22
    • 1970-01-01
    • 1970-01-01
    • 2017-05-03
    • 2013-05-24
    • 2015-09-03
    • 2014-04-01
    • 1970-01-01
    相关资源
    最近更新 更多