【发布时间】:2017-04-22 08:42:30
【问题描述】:
我有这段代码:
val lines: org.apache.spark.streaming.dstream.InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics)
lines.foreachRDD { rdd =>
val df = cassandraSQLContext.read.json(rdd.map(x => x._2))
sparkStreamingService.run(df)
}
ssc.start()
ssc.awaitTermination()
我的理解是,foreachRDD 发生在驱动程序级别?所以基本上所有的代码块:
lines.foreachRDD { rdd =>
val df = cassandraSQLContext.read.json(rdd.map(x => x._2))
sparkStreamingService.run(df)
}
发生在驱动程序级别? sparkStreamingService.run(df) 方法基本上对当前数据帧进行一些转换以产生一个新的数据帧,然后调用另一个方法(在另一个 jar 上)将数据帧存储到 cassandra。 因此,如果这一切都发生在驱动程序级别,我们没有使用 spark 执行器,我怎样才能使其并行使用执行器来并行处理 RDD 的每个分区
我的 spark 流服务运行方法:
var metadataDataframe = df.select("customer", "tableName", "messageContent", "initialLoadRunning").collect()
metadataDataframe.foreach(rowD => {
metaData = populateMetaDataService.populateSiteMetaData(rowD)
val headers = (rowD.getString(2).split(recordDelimiter)(0))
val fields = headers.split("\u0001").map(
fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
val listOfRawData = rowD.getString(2).indexOf(recordDelimiter)
val dataWithoutHeaders = rowD.getString(2).substring(listOfRawData + 1)
val rawData = sparkContext.parallelize(dataWithoutHeaders.split(recordDelimiter))
// val rawData = dataWithoutHeaders.split(recordDelimiter)
val rowRDD = rawData
.map(_.split("\u0001"))
.map(attributes => Row(attributes: _*))
val newDF = cassandraSQLContext.createDataFrame(rowRDD, schema)
dataFrameFilterService.processBasedOnOpType(metaData, newDF)
})
【问题讨论】:
标签: apache-spark