【问题标题】:How to process multiple DataFrames concurrently in Spark? [duplicate]如何在 Spark 中同时处理多个 DataFrame? [复制]
【发布时间】:2017-10-31 00:32:42
【问题描述】:

我是 Spark 的(非常)新手,所以我的术语可能不正确,但这是我想要做的:

  • 我每天都有一组 CSV 文件,代表 S3 上数据库中表的快照(称它们为表 A、B、C,存储在例如 s3://bucket/20171027/a.csv.gz
  • 我想在键 (id) 上使用 Spark SQL 连接这些表(一天),然后将连接的表以 JSON 格式保存到 S3。

我可以按顺序(每天)执行此操作,但想利用 Spark 并行化。

我目前的流程大致是:

  • 列出 S3 中的所有文件
  • 按时间戳分组
  • 创建要加入的文件名数组
  • result 是时间戳 => 文件的映射(例如 20171027 => ["s3://foo/20171027/a.csv", "s3://foo/20171027/b.csv"])

然后,我每天将每个文件加载到 DataFrame 中,围绕删除重复列执行一些逻辑,然后调用 df1.join(df2)。连接完成后,我调用df.write.json

每一天都可以独立完成,但我不知道如何让 Spark 同时运行这些连接操作。我尝试使用sc.parallelize 与时间戳作为序列,但我不能让Spark 上下文在执行程序中加载数据帧,如果在调用parallelize 之前加载数据帧,执行程序无法读取它们并抛出一个空对象异常。我想我需要考虑使用期货,但我想知道是否还有其他选择可以完成我想做的事情,或者我是否让它变得比需要的更复杂。

【问题讨论】:

  • 您介意解释一下为什么您的问题与@user8371915 提到的问题重复吗?

标签: apache-spark


【解决方案1】:

我想出的解决方案是使用 Futures 和一个等于 executor 数量的线程池。每天循环,我做连接表的操作,然后在它自己的 Future 中写入磁盘。线程池将并发限制在执行者的数量上。然后它会等待所有期货在完成之前完成。

implicit val ec = ExecutionContext.fromExecutorService(Executors.newWorkStealingPool(numExecutors))
val futures = ArrayBuffer[Future[Unit]]()

for (date <- files.keys) {
  val f = Future {
    // load tables from S3 into data frames
    // join data frames on ID
    // write joined dataframe to S3
  }
  futures += f
}

futures.foreach(f => Await.ready(f, Duration.Inf))

【讨论】:

    猜你喜欢
    • 2017-12-15
    • 1970-01-01
    • 2016-01-29
    • 2020-07-30
    • 2018-07-18
    • 1970-01-01
    • 2020-10-30
    • 1970-01-01
    相关资源
    最近更新 更多