【问题标题】:How can I efficiently send data in a parallelized way to a REST endpoint using Spark?如何使用 Spark 以并行方式有效地将数据发送到 REST 端点?
【发布时间】:2025-12-14 04:45:02
【问题描述】:

我有一个存储在 HDFS 中的大文件 mydata.txt,其中每一行都包含必须提交到 REST 端点的数据。我想知道如何有效地对数据(文件中的行)进行分组/分区,然后使用OkHttp 将它们提交到 REST 端点。我想对数据进行分组/分区,因为我不想创建太多 HTTP 客户端,也不想分配工作负载。

例如,我目前有以下类似的东西。

val sc = new SparkContext(new SparkConf())
val client = new OkHttpClient
val input = "hdfs://myserver/path/to/mydata.txt"

sc.textFile(input)
 .foreach(line => {
  val request = new Request.Builder()
   .url("http://anotherserver/api/data")
   .post(RequestBody.create(MediaType.parse("application/json"), line))
   .build()
  client.newCall(request).execute()
 })

据我了解,foreach 是一个Action,因此它在驱动程序上被调用,因此,client 不必被序列化并且可以用于所有数据(行)。当然,这个解决方案不是并行的。

我也考虑过分区,但我认为foreachPartition 也是Action

sc.textFile(input)
 .map(line => (Random.nextInt(10), line))
 .partitionBy(new HashPartitioner(10))
 .foreachPartition(iter => {
  while(iter.hasNext) {
   val item = iter.next()
   val line = item._2
   //submit to REST endpoint
  }
 })

关于如何使用 Spark 并行化将数据提交到 REST 端点的工作有什么想法吗?

编辑原来OkHttpClient是不可序列化的,甚至不能在foreach循环中使用。

【问题讨论】:

    标签: apache-spark parallel-processing okhttp3


    【解决方案1】:

    解决这类问题的典型方法如下:

    1. 确保您要使用的 REST 库可供所有执行程序使用。这样就无需担心序列化了。

    2. 根据核心数量选择并发级别。

    3. 重新分区您的数据,使#partitions >= k * #executors。在访问具有可变吞吐量的外部服务时,我使用较大的k,例如 5-10,以减少一批“慢”输入拖慢整个作业的可能性。

    4. map() 数据并在映射函数的主体内设置客户端,从而消除了序列化问题。返回一对输入和成功/失败以及任何诊断信息。

    5. 过滤失败并决定如何处理它们,例如重新处理它们(您甚至可以保留重试次数)。

    如果设置 HTTP 客户端的成本很高,请使用 mapPartitions() 而不是 map(),因为它允许您设置一次客户端并使用它处理许多输入。

    基础版:

    def restCall(url: String): MyResultOrError = ...
    val numCoresPerExecutor = ...
    val numCores = numCoresPerExecutor * (sc.getExecutorStorageStatus.length - 1)
    val result = rdd
      .repartition(5 * numCores)
      .map(url => (url, restCall(url)))
    

    【讨论】:

    • mapPartitions 方法对我有用。但是,除非我在最后打电话给action,比如collect,否则什么都没有发生。
    • 当然,这就是 Spark 的工作原理。转换只是构建一个执行 DAG,直到调用一个操作。