【发布时间】:2018-01-08 21:47:51
【问题描述】:
我们开发了一个 spark 流应用程序,它从 kafka 获取数据并写入 mongoDB。在输入 DStream 上的 foreachRDD 内创建连接时,我们注意到性能影响。 Spark 流应用程序在插入 mongoDB 之前会进行一些验证。我们正在探索避免为处理的每条消息连接到 mongoDB 的选项,而是希望一次在一个批处理间隔内处理所有消息。以下是 spark 流应用程序的简化版本。我们所做的一件事是将所有消息附加到数据帧并尝试将该数据帧的内容插入到 foreachRDD 之外。但是当我们运行这个应用程序时,将数据帧写入 mongoDB 的代码不会被执行。
请注意,我注释掉了 foreachRDD 中用于将每条消息插入 mongoDB 的部分代码。现有方法非常慢,因为我们一次插入一条消息。非常感谢任何有关性能改进的建议。
谢谢
package com.testing
import org.apache.spark.streaming._
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{ Seconds, StreamingContext }
import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.streaming.kafka._
import org.apache.spark.sql.{ SQLContext, Row, Column, DataFrame }
import java.util.HashMap
import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerConfig, ProducerRecord }
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.joda.time._
import org.joda.time.format._
import org.json4s._
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._
import com.mongodb.util.JSON
import scala.io.Source._
import java.util.Properties
import java.util.Calendar
import scala.collection.immutable
import org.json4s.DefaultFormats
object Sample_Streaming {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("Sample_Streaming")
.setMaster("local[4]")
val sc = new SparkContext(sparkConf)
sc.setLogLevel("ERROR")
val sqlContext = new SQLContext(sc)
val ssc = new StreamingContext(sc, Seconds(1))
val props = new HashMap[String, Object]()
val bootstrap_server_config = "127.0.0.100:9092"
val zkQuorum = "127.0.0.101:2181"
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_server_config)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
val TopicMap = Map("sampleTopic" -> 1)
val KafkaDstream = KafkaUtils.createStream(ssc, zkQuorum, "group", TopicMap).map(_._2)
val schemaDf = sqlContext.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", "connectionURI")
.option("spark.mongodb.input.collection", "schemaCollectionName")
.load()
val outSchema = schemaDf.schema
var outDf = sqlContext.createDataFrame(sc.emptyRDD[Row], outSchema)
KafkaDstream.foreachRDD(rdd => rdd.collect().map { x =>
{
val jsonInput: JValue = parse(x)
/*Do all the transformations using Json libraries*/
val json4s_transformed = "transformed json"
val rdd = sc.parallelize(compact(render(json4s_transformed)) :: Nil)
val df = sqlContext.read.schema(outSchema).json(rdd)
//Earlier we were inserting each message into mongoDB, which we would like to avoid and process all at once
/* df.write.option("spark.mongodb.output.uri", "connectionURI")
.option("collection", "Collection")
.mode("append").format("com.mongodb.spark.sql").save()*/
outDf = outDf.union(df)
}
}
)
//Added this part of the code in expectation to access the unioned dataframe and insert all messages at once
//println(outDf.count())
if(outDf.count() > 0)
{
outDf.write
.option("spark.mongodb.output.uri", "connectionURI")
.option("collection", "Collection")
.mode("append").format("com.mongodb.spark.sql").save()
}
// Run the streaming job
ssc.start()
ssc.awaitTermination()
}
}
【问题讨论】:
-
我对“我们希望一次处理 DStream 中的所有消息”感到困惑。 DStreams 是无限的......你的意思是一次处理一个批处理间隔内的所有消息吗?
-
是的,我的意思是一次处理一个批处理间隔内的所有消息。很抱歉造成混乱
标签: mongodb scala apache-spark spark-streaming