【问题标题】:Trying to understand spark streaming flow试图了解火花流
【发布时间】:2017-04-22 08:42:30
【问题描述】:

我有这段代码:

val lines: org.apache.spark.streaming.dstream.InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topics)
    lines.foreachRDD { rdd =>
      val df = cassandraSQLContext.read.json(rdd.map(x => x._2))
      sparkStreamingService.run(df)
    }
    ssc.start()
    ssc.awaitTermination()

我的理解是,foreachRDD 发生在驱动程序级别?所以基本上所有的代码块:

lines.foreachRDD { rdd =>
  val df = cassandraSQLContext.read.json(rdd.map(x => x._2))
  sparkStreamingService.run(df)
}

发生在驱动程序级别? sparkStreamingService.run(df) 方法基本上对当前数据帧进行一些转换以产生一个新的数据帧,然后调用另一个方法(在另一个 jar 上)将数据帧存储到 cassandra。 因此,如果这一切都发生在驱动程序级别,我们没有使用 spark 执行器,我怎样才能使其并行使用执行器来并行处理 RDD 的每个分区

我的 spark 流服务运行方法:

    var metadataDataframe = df.select("customer", "tableName", "messageContent", "initialLoadRunning").collect()
 metadataDataframe.foreach(rowD => {
      metaData = populateMetaDataService.populateSiteMetaData(rowD)
      val headers = (rowD.getString(2).split(recordDelimiter)(0))

      val fields = headers.split("\u0001").map(
        fieldName => StructField(fieldName, StringType, nullable = true))
      val schema = StructType(fields)

      val listOfRawData = rowD.getString(2).indexOf(recordDelimiter)
      val dataWithoutHeaders = rowD.getString(2).substring(listOfRawData + 1)

      val rawData = sparkContext.parallelize(dataWithoutHeaders.split(recordDelimiter))
//      val rawData = dataWithoutHeaders.split(recordDelimiter)
      val rowRDD = rawData
        .map(_.split("\u0001"))
        .map(attributes => Row(attributes: _*))

      val newDF = cassandraSQLContext.createDataFrame(rowRDD, schema)
      dataFrameFilterService.processBasedOnOpType(metaData, newDF)
    })

【问题讨论】:

    标签: apache-spark


    【解决方案1】:

    foreachRDD 的调用确实发生在驱动程序节点上。但是,由于我们在 RDD 级别上运行,因此对其进行的任何转换都将是分布式的。在您的示例中,rdd.map 将导致每个分区被发送到特定的工作节点进行计算。

    由于我们不知道您的 sparkStreamingService.run 方法在做什么,因此我们无法告诉您其执行的位置。

    【讨论】:

    • 我添加了run方法的代码。这是最有效的方法吗?这个方法是并行化的权利:dataFrameFilterService.processBasedOnOpType(metaData, newDF)。我的理解是,我可以避免使用 collect 来加快处理速度,以便并行处理记录?
    【解决方案2】:

    foreachRDD 可以在本地运行,但这只是意味着设置。 RDD本身就是一个分布式集合,所以实际工作是分布式的。

    直接评论文档中的代码:

    dstream.foreachRDD { rdd =>
      val connection = createNewConnection()  // executed at the driver
      rdd.foreach { record =>
        connection.send(record) // executed at the worker
      }
    }
    

    请注意,不基于 RDD 的代码部分在驱动程序处执行。它是使用 RDD 构建的代码,然后分发给工作人员。

    您的代码具体注释如下:

       //df.select will be distributed, but collect will pull it all back in
       var metadataDataframe = df.select("customer", "tableName", "messageContent", "initialLoadRunning").collect()
     //Since collect created a local collection then this is done on the driver
     metadataDataframe.foreach(rowD => {
          metaData = populateMetaDataService.populateSiteMetaData(rowD)
          val headers = (rowD.getString(2).split(recordDelimiter)(0))
    
          val fields = headers.split("\u0001").map(
            fieldName => StructField(fieldName, StringType, nullable = true))
          val schema = StructType(fields)
    
          val listOfRawData = rowD.getString(2).indexOf(recordDelimiter)
          val dataWithoutHeaders = rowD.getString(2).substring(listOfRawData + 1)
    
          //This will run locally, creating a distributed record
          val rawData = sparkContext.parallelize(dataWithoutHeaders.split(recordDelimiter))
    //      val rawData = dataWithoutHeaders.split(recordDelimiter)
          //This will redistribute the work
          val rowRDD = rawData
            .map(_.split("\u0001"))
            .map(attributes => Row(attributes: _*))
          //again, setting this up locally, to be run distributed
          val newDF = cassandraSQLContext.createDataFrame(rowRDD, schema)
          dataFrameFilterService.processBasedOnOpType(metaData, newDF)
        })
    

    最终,您可能可以将其重写为不需要收集并将其全部分发,但 StackOverflow 不适合您

    【讨论】:

    • 但是在这里:spark.apache.org/docs/latest/streaming-programming-guide.html,如果您向下滚动到他们使用 forEachRdd 的位置,他们会评论说正在驱动程序执行特定的语句
    • @Ahmed 我对其进行了编辑以直接解决该问题,并添加了比文档更明确的内容
    • 是的,这是我的问题,关于那个收藏。截至目前,由于我正在收集,记录是按顺序处理的,然后这些记录中的每一个都分布在执行者之间?然后去掉collect,所有的记录会并行处理而不是顺序处理,对吗?
    • 还有一个问题贾斯汀,所以这基本上是我在这里做的数据帧中的一个数据帧。第二个数据帧是否也分布在执行者之间?我知道第一个数据帧在执行程序之间进行分区,然后从数据帧的这些分区中,我正在创建另一个数据帧,只有创建内部数据帧的执行程序才具有该数据帧,或者也将被分区。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2014-10-27
    • 2017-04-27
    • 2016-09-27
    • 1970-01-01
    • 2019-04-02
    • 2016-02-07
    • 2015-05-15
    相关资源
    最近更新 更多