【问题标题】:Avoid multiple connections to mongoDB from spark streaming避免火花流与 mongoDB 的多个连接
【发布时间】:2018-01-08 21:47:51
【问题描述】:

我们开发了一个 spark 流应用程序,它从 kafka 获取数据并写入 mongoDB。在输入 DStream 上的 foreachRDD 内创建连接时,我们注意到性能影响。 Spark 流应用程序在插入 mongoDB 之前会进行一些验证。我们正在探索避免为处理的每条消息连接到 mongoDB 的选项,而是希望一次在一个批处理间隔内处理所有消息。以下是 spark 流应用程序的简化版本。我们所做的一件事是将所有消息附加到数据帧并尝试将该数据帧的内容插入到 foreachRDD 之外。但是当我们运行这个应用程序时,将数据帧写入 mongoDB 的代码不会被执行。

请注意,我注释掉了 foreachRDD 中用于将每条消息插入 mongoDB 的部分代码。现有方法非常慢,因为我们一次插入一条消息。非常感谢任何有关性能改进的建议。

谢谢

package com.testing

import org.apache.spark.streaming._
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{ Seconds, StreamingContext }
import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.streaming.kafka._
import org.apache.spark.sql.{ SQLContext, Row, Column, DataFrame }
import java.util.HashMap
import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerConfig, ProducerRecord }
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

import org.joda.time._
import org.joda.time.format._

import org.json4s._
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._
import com.mongodb.util.JSON

import scala.io.Source._
import java.util.Properties
import java.util.Calendar

import scala.collection.immutable
import org.json4s.DefaultFormats


object Sample_Streaming {

  def main(args: Array[String]) {

    val sparkConf = new SparkConf().setAppName("Sample_Streaming")
      .setMaster("local[4]")

    val sc = new SparkContext(sparkConf)
    sc.setLogLevel("ERROR")

    val sqlContext = new SQLContext(sc)
    val ssc = new StreamingContext(sc, Seconds(1))

    val props = new HashMap[String, Object]()


    val bootstrap_server_config = "127.0.0.100:9092"
    val zkQuorum = "127.0.0.101:2181"



    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_server_config)
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")

    val TopicMap = Map("sampleTopic" -> 1)
    val KafkaDstream = KafkaUtils.createStream(ssc, zkQuorum, "group", TopicMap).map(_._2)

      val schemaDf = sqlContext.read.format("com.mongodb.spark.sql.DefaultSource")
        .option("spark.mongodb.input.uri", "connectionURI")
        .option("spark.mongodb.input.collection", "schemaCollectionName")
        .load()

      val outSchema = schemaDf.schema
      var outDf = sqlContext.createDataFrame(sc.emptyRDD[Row], outSchema)

    KafkaDstream.foreachRDD(rdd => rdd.collect().map { x =>
      {
        val jsonInput: JValue = parse(x)


        /*Do all the transformations using Json libraries*/

        val json4s_transformed = "transformed json"

        val rdd = sc.parallelize(compact(render(json4s_transformed)) :: Nil)
        val df = sqlContext.read.schema(outSchema).json(rdd)

 //Earlier we were inserting each message into mongoDB, which we would like to avoid and process all at once       
/*        df.write.option("spark.mongodb.output.uri", "connectionURI")
                  .option("collection", "Collection")
                  .mode("append").format("com.mongodb.spark.sql").save()*/
        outDf = outDf.union(df)

      }

    }

    )


      //Added this part of the code in expectation to access the unioned dataframe and insert all messages at once
      //println(outDf.count())
      if(outDf.count() > 0)
      {
        outDf.write
                  .option("spark.mongodb.output.uri", "connectionURI")
                  .option("collection", "Collection")
                  .mode("append").format("com.mongodb.spark.sql").save()
      }


    // Run the streaming job
    ssc.start()
    ssc.awaitTermination()
  }

}

【问题讨论】:

  • 我对“我们希望一次处理 DStream 中的所有消息”感到困惑。 DStreams 是无限的......你的意思是一次处理一个批处理间隔内的所有消息吗?
  • 是的,我的意思是一次处理一个批处理间隔内的所有消息。很抱歉造成混乱

标签: mongodb scala apache-spark spark-streaming


【解决方案1】:

听起来您想减少到mongodb的连接数,为此,您必须在提供连接时在代码中使用foreachPartition do mongodb 请参阅spec,代码将如下所示:

rdd.repartition(1).foreachPartition {
    //get instance of connection
    //write/read with batch to mongo
    //close connection
}

【讨论】:

  • 感谢您的意见。我会试试看。
猜你喜欢
  • 2021-06-13
  • 2016-05-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2015-08-07
  • 2019-06-10
  • 2020-11-02
  • 1970-01-01
相关资源
最近更新 更多