【问题标题】:In Apache Spark, how to convert a slow RDD/dataset into a stream?在 Apache Spark 中,如何将慢速 RDD/数据集转换为流?
【发布时间】:2019-12-20 06:27:00
【问题描述】:

我正在调查一个有趣的案例,该案例涉及对慢速 RDD 或数据集进行广泛转换(例如重新分区和连接),例如以下代码定义的数据集:

val ds = sqlContext.createDataset(1 to 100)
  .repartition(1)
  .mapPartitions { itr =>
    itr.map { ii =>
      Thread.sleep(100)
      println(f"skewed - ${ii}")
      ii
    }
  }

慢速数据集是相关的,因为它类似于远程数据源的视图,并且分区迭代器源自单线程网络协议(http、jdbc 等),在这种情况下,下载速度 >单线程处理的速度,但是

不幸的是,传统的 Spark 计算模型在慢速数据集上效率不高,因为我们受限于以下选项之一:

  1. 仅使用窄转换 (flatMap-ish) 在单个线程中对流进行端到端的数据处理,显然数据处理将成为瓶颈,并且资源利用率会很低。

  2. 使用宽操作(包括重新分区)来平衡 RDD/数据集,虽然这对于并行数据处理效率至关重要,但 Spark 粗粒度调度程序要求完全完成下载,这成为另一个瓶颈.

实验

下面的程序是对这种情况的简单模拟:

val mapped = ds

val mapped2 = mapped
  .repartition(10)
  .map { ii =>
    println(f"repartitioned - ${ii}")
    ii
  }

mapped2.foreach { _ =>
  }

在执行上述程序时,可以观察到在RDD依赖中,println(f"repartitioned - ${ii}")行将不会在println(f"skewed - ${ii}")行之前执行。

我想指示 Spark 调度程序在其任务完成之前开始分发/传送分区迭代器生成的数据条目(通过微批处理或流等机制)。有没有一种简单的方法可以做到这一点?例如。将慢速数据集转换为结构化流会很好,但应该有更好集成的替代方案。

非常感谢您的意见

更新:为了让您的实验更容易,我附加了可以开箱即用的 scala 测试:

package com.tribbloids.spookystuff.spike

import org.apache.spark.SparkContext
import org.apache.spark.sql.{SQLContext, SparkSession}
import org.scalatest.{FunSpec, Ignore}

@Ignore
class SlowRDDSpike extends FunSpec {

  lazy val spark: SparkSession = SparkSession.builder().master("local[*]").getOrCreate()

  lazy val sc: SparkContext = spark.sparkContext
  lazy val sqlContext: SQLContext = spark.sqlContext

  import sqlContext.implicits._

  describe("is repartitioning non-blocking?") {

    it("dataset") {

      val ds = sqlContext
        .createDataset(1 to 100)
        .repartition(1)
        .mapPartitions { itr =>
          itr.map { ii =>
            Thread.sleep(100)
            println(f"skewed - $ii")
            ii
          }
        }

      val mapped = ds

      val mapped2 = mapped
        .repartition(10)
        .map { ii =>
          Thread.sleep(400)
          println(f"repartitioned - $ii")
          ii
        }

      mapped2.foreach { _ =>
        }
    }
  }

  it("RDD") {
    val ds = sc
      .parallelize(1 to 100)
      .repartition(1)
      .mapPartitions { itr =>
        itr.map { ii =>
          Thread.sleep(100)
          println(f"skewed - $ii")
          ii
        }
      }

    val mapped = ds

    val mapped2 = mapped
      .repartition(10)
      .map { ii =>
        Thread.sleep(400)
        println(f"repartitioned - $ii")
        ii
      }

    mapped2.foreach { _ =>
      }

  }
}

【问题讨论】:

  • 我很犹豫是否建议它,因为它构建得非常糟糕,但是您是否尝试过使用 Spark Streaming ?如果是这样,什么不起作用?
  • 我标题中的'stream'指的是Spark Streaming的2个实现,我没有找到解决方案
  • “Spark 粗粒度调度程序要求下载完全完成” - 为什么会出现这个问题?你能指定数据源吗?
  • 是的,它很慢,需要几分钟

标签: scala apache-spark apache-spark-sql spark-streaming


【解决方案1】:

我认为设置一个队列来存储消息可能是个好主意,这样以后您就可以添加一个 Spark Streaming 进程来摄取它们。如果您可以使用 JDBC 访问您的源,为什么不添加一个从该源读取数据并将数据存储在主题(例如 Kafka、Kinesis、SGS、ZeroMQ)的进程,以便您可以从那里的 Spark Streaming 连接?这种架构将提取和处理解耦(因为它们是不同的东西)。

通过这个过程:

  • 如果您(或其他人)想要将提取的数据用于其他处理,也可以重复使用这些数据
  • 此外,由于只能将 1 个连接设置为源连接,因此您可以在该进程上配置不同的计划,这样您就不必一直使用连接(如果需要)
  • 你减少了对源的压力

该进程可以基于运行在可以访问两者(例如源和 Kafka)的机器上的守护进程(例如),因此它不会消耗 Yarn Namenode 上的资源(如果您使用的是基于 Spark 的在 Hadoop 上)

【讨论】:

  • 非常感谢这个想法,我现在实际上正在尝试实现它。我最担心的是发布任务一直占用资源池,除了等待队列被消耗之外什么都不做。如果这是本机功能,则发布者的资源应该已经由流消费者处理
  • 我认为您可以使用两种方法来解决该问题:1)将发布者实现为流,并具有控制背压的机制(例如使用 akka doc.akka.io/docs/akka/current/stream/stream-refs.html)。这将保持对源的持续压力。 2)或者在源上注入一种机制,使其与基于数据库推送的发布隔离(即实现日志传送(例如),如您在 SQL Server docs.microsoft.com/en-us/sql/database-engine/log-shipping/… 上看到的那样)。
【解决方案2】:

首先感谢您提供实验代码。 这个问题取决于数据源(请参阅下面的为什么有关数据源的信息是必不可少的部分)。

话虽如此,这里的主要问题是创建更多分区同时避免洗牌。不幸的是,repartition 是operations which requires shuffle 之一。

在您的示例中,您可以使用union 增加分区数量而无需随机播放。

var ds: Dataset[Int] = Seq[Int]().toDS()
val sequences = (1 to 100).grouped(10)
sequences.map(sequence => {
  println(sequence)
  sqlContext.createDataset(sequence)
}).foreach(sequenceDS =>  {
  ds = ds.union(sequenceDS)
})

使用联合数据集的结果: 经过时间:24980 毫秒 分区数:41

如果没有联合,总时间是34493 ms,因此我们看到本地机器的显着改进。

这避免了随机播放,但会创建多个到给定 http 端点或数据库连接的连接。这是用于管理并行性的common practice

无需将数据集转换为流式传输,因为流式传输适用于数据集。如果您的数据源支持流式传输,您可以使用它来生成数据集,而无需从批处理转换到流式传输。如果您的数据源不支持流式传输,您可以考虑使用custom receivers


为什么有关数据源的信息至关重要:

  1. 从给定数据源读取数据时,您能否控制初始数据集的分区数?
  2. 可接受的请求率或与您的数据源的连接数是多少?
  3. 涉及多少数据?随机播放是一种选择吗?
  4. 您的数据源是否支持火花流?一些数据源(kinesis、Kafka、文件系统、ElasticSearch)支持流式传输,而另一些则不支持。

完整逻辑:

  it("dataset_with_union") {
    val start = System.nanoTime()
    var ds: Dataset[Int] = Seq[Int]().toDS()
    val sequences = (1 to 100).grouped(10)
    sequences.map(sequence => {
      println(sequence)
      sqlContext.createDataset(sequence)
    }).foreach(sequenceDS =>  {
      ds = ds.union(sequenceDS)
    })

    ds.mapPartitions { itr =>
      itr.map { ii =>
        Thread.sleep(100)
        ii
      }
    }

    // Number of partitions here is 41
    println(f"dataset number or partitions: ${ds.rdd.getNumPartitions}")
    val mapped = ds

    val mapped2 = mapped
      .repartition(10)
      .map { ii =>
        Thread.sleep(400)
        println(f"repartitioned - $ii")
        ii
      }

    mapped2.foreach { _ =>
    }

    val end = System.nanoTime()
    println("Elapsed time: " + (end - start) + "ns")
  }

【讨论】:

  • 从给定数据源读取时,能否控制初始 Dataset 的分区数?:不能,一个任务中的 executor 上只能创建 1 个连接
  • 您的数据源可接受的请求率或连接数是多少?只有 1,而且总是从头开始,而不是从中间开始
  • 涉及多少数据?洗牌是一种选择吗?数据大小是可变的,但通常需要几分钟才能传输。随机播放是我现在正在做的,但它会导致太多延迟
  • 您的数据源是否支持 Spark 流式传输?这是g'old JDBC,我想我可以为它写一个源代码V2,但为什么要重新发明轮子
  • 顺便说一句,批转流是非常严重的需求,是Apache Flink的原生执行模式
猜你喜欢
  • 2018-06-14
  • 2016-12-12
  • 2017-07-08
  • 1970-01-01
  • 2016-05-12
  • 2023-03-26
  • 1970-01-01
  • 2016-04-21
相关资源
最近更新 更多