【问题标题】:scala (spark) zio convert future to zioscala (spark) zio 将 future 转换为 zio
【发布时间】:2020-04-28 17:54:10
【问题描述】:

我的目标是在一个数据集上运行多个 spark ml 回归模型(1000 次),我想使用 zio 而不是 future,因为它运行速度太慢。下面是使用 Future 的工作示例。 不同的键列表用于过滤键上的分区数据集并在其上运行模型。我设置了一个有 8 个执行器的线程池来管理它,但它的性能很快就会下降。

import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutorService, Future}
import java.util.concurrent.{Executors, TimeUnit}
import scala.concurrent.duration._
import org.apache.spark.sql.SaveMode

val pool = Executors.newFixedThreadPool(8)
implicit val xc: ExecutionContextExecutorService = ExecutionContext.fromExecutorService(pool)

case class Result(key: String, coeffs: String)

try {

    import spark.implicits._
    val tasks = {
    for (x <- keys)
        yield Future {
        Seq(
            Result(
            x.group,
            runModel(input.filter(col("group")===x)).mkString(",")
            )
        ).toDS()
            .write.mode(SaveMode.Overwrite).option("header", false).csv(
            s"hdfs://namenode:8020/results/$x.csv"
            )
        }
    }.toSeq
    Await.result(Future.sequence(tasks), Duration.Inf)
}
finally {
    pool.shutdown()
    pool.awaitTermination(Long.MaxValue, TimeUnit.NANOSECONDS)
}

我尝试在 zio 中实现这一点,但我不知道如何实现队列并设置执行器的限制,就像在期货中一样。

以下是我目前失败的尝试...

import zio._
import zio.console._
import zio.stm._
import org.apache.spark.sql.{Dataset, SaveMode, SparkSession}
import org.apache.spark.sql.functions.col

//example data/signatures
case class ModelResult(key: String, coeffs: String)
case class Data(key: String, sales: Double)
val keys: Array[String] = Array("100_1", "100_2")
def runModel[T](ds: Dataset[T]): Vector[Double]

object MyApp1 extends App {

  val spark = SparkSession
    .builder()
    .getOrCreate()

  import spark.implicits._

  val input: Dataset[Data] = Seq(Data("100_1", 1d), Data("100_2", 2d)).toDS

  def run(args: List[String]): ZIO[ZEnv, Nothing, Int] = {

    for {
      queue <- Queue.bounded[Int](8)
      _ <- ZIO.foreach(1 to 8) (i => queue.offer(i)).fork
      _ <- ZIO.foreach(keys) { k => queue.take.flatMap(_ => readWrite(k, input, queue)) }
    } yield 0
  }

  def writecsv(k: String, v: String) = {

    Seq(ModelResult(k, v))
    .toDS
    .write
    .mode(SaveMode.Overwrite).option("header", value = false)
    .csv(s"hdfs://namenode:8020/results/$k.csv")
  }

  def readWrite[T](key: String, ds: Dataset[T], queue: Queue[Int]): ZIO[ZEnv, Nothing, Int] = {

    (for {
      result <- runModel(ds.filter(col("key")===key)).mkString(",")
      _ <- writecsv(key, result)
      _ <- queue.offer(1)
      _ <- putStrLn(s"successfully wrote output for $key")
    } yield 0)
  }
}

//to run
MyApp1.run(List[String]())

在 zio 中处理计算的最佳方法是什么?

【问题讨论】:

    标签: scala apache-spark zio


    【解决方案1】:

    要在 8 个线程上并行处理一些工作负载,您只需要

    ZIO.foreachParN(8)(1 to 100)(id => zio.blocking.blocking(Task{yourClusterJob(id)}))
    

    但不要指望在这里从 Futures 切换到 ZIO 会有很大的提升:

    1) 实际工作负载主导协调开销,因此 ZIOFuture 之间的差异应该很小。

    2) 也许您根本不会得到任何提升,因为 8 个任务将争夺 Spark 集群中的同一个资源池。

    【讨论】:

    • 谢谢!是的,您对第 1 点和第 2 点是正确的。我现在对这个问题采取了不同的方法,因为 spark ml 对于这种类型的工作来说太慢了。
    猜你喜欢
    • 2020-10-22
    • 2021-07-17
    • 2021-07-21
    • 2021-08-04
    • 2020-11-01
    • 2019-06-16
    • 1970-01-01
    • 2022-12-01
    • 2023-04-07
    相关资源
    最近更新 更多