【问题标题】:Spark Streaming scala performance drastic slowSpark Streaming scala 性能极慢
【发布时间】:2018-10-05 16:21:33
【问题描述】:

我有以下代码:-

case class event(imei: String, date: String, gpsdt: String,dt: String,id: String)
case class historyevent(imei: String, date: String, gpsdt: String)
object kafkatesting {
def main(args: Array[String]) {

val clients = new RedisClientPool("192.168.0.40", 6379)
val conf = new SparkConf()
  .setAppName("KafkaReceiver")
  .set("spark.cassandra.connection.host", "192.168.0.40")
  .set("spark.cassandra.connection.keep_alive_ms", "20000")
  .set("spark.executor.memory", "3g")
  .set("spark.driver.memory", "4g")
  .set("spark.submit.deployMode", "cluster")
  .set("spark.executor.instances", "4")
  .set("spark.executor.cores", "3")
  .set("spark.streaming.backpressure.enabled", "true")
  .set("spark.streaming.backpressure.initialRate", "100")
  .set("spark.streaming.kafka.maxRatePerPartition", "7")

val sc = SparkContext.getOrCreate(conf)
val ssc = new StreamingContext(sc, Seconds(10))
val sqlContext = new SQLContext(sc)
val kafkaParams = Map[String, String](
  "bootstrap.servers" -> "192.168.0.113:9092",
  "group.id" -> "test-group-aditya",
  "auto.offset.reset" -> "largest")

val topics = Set("random")
val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)

kafkaStream.foreachRDD { rdd =>

  val updatedRDD = rdd.map(a =>
    {
      implicit val formats = DefaultFormats
      val jValue = parse(a._2)
      val fleetrecord = jValue.extract[historyevent]
      val hash = fleetrecord.imei + fleetrecord.date + fleetrecord.gpsdt
      val md5Hash = DigestUtils.md5Hex(hash).toUpperCase()
      val now = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(Calendar.getInstance().getTime())

      event(fleetrecord.imei, fleetrecord.date, fleetrecord.gpsdt, now, md5Hash)
    })
    .collect()

  updatedRDD.foreach(f =>
    {
      clients.withClient {
        client =>
          {
            val value = f.imei + " , " + f.gpsdt
            val zscore = Calendar.getInstance().getTimeInMillis
            val key = new SimpleDateFormat("yyyy-MM-dd").format(Calendar.getInstance().getTime())
            val dt = new SimpleDateFormat("HH:mm:ss").format(Calendar.getInstance().getTime())
            val q1 = "00:00:00"
            val q2 = "06:00:00"
            val q3 = "12:00:00"
            val q4 = "18:00:00"
            val quater = if (dt > q1 && dt < q2) {
              System.out.println(dt + " lies in quarter 1");
              " -> 1"
            } else if (dt > q2 && dt < q3) {
              System.out.println(dt + " lies in quarter 2");
              " -> 2"
            } else if (dt > q3 && dt < q4) {
              System.out.println(dt + " lies in quarter 3");
              " -> 3"
            } else {
              System.out.println(dt + " lies in quarter 4");
              " -> 4"
            }
            client.zadd(key + quater, zscore, value)
            println(f.toString())
          }
      }
    })
  val collection = sc.parallelize(updatedRDD)
  collection.saveToCassandra("db", "table", SomeColumns("imei", "date", "gpsdt","dt","id"))
}

ssc.start()
ssc.awaitTermination()
}
}

我正在使用此代码将数据从 Kafka 插入 Cassandra 和 Redis,但面临以下问题:-

1) 应用程序创建一个长的活动批次队列,而当前正在处理前一个批次。所以,我只想在上一批执行完毕后再进行下一批。

2) 我有一个四节点集群,它正在处理每个批次,但执行 700 条记录大约需要 30-40 秒。

我的代码是否经过优化,或者我需要处理我的代码以获得更好的性能?

【问题讨论】:

  • 我不确定 Redis,但您将数据保存到 cassandra 的方式是错误的。有API可以将Dstream直接保存到cassandra中,无需采集转换为RDD。也尝试使用mapPartition 而不是foreachRDD。您可以查看包裹com.datastax.spark.connector.streaming
  • @vindev - 嗨,正如你所见,我实际上是从 Dstream 中提取数据并将一些列添加到每一行,然后将其保存到 cassandra。那么,这可能按照您的建议方式进行吗?你能分享你建议的更新代码吗?
  • 不确定 spark-streaming 相关问题,但在每个事件上创建 SimpleDateFormatCalendar 非常浪费资源。 println 调用每个事件也很慢。

标签: scala redis apache-kafka spark-streaming spark-cassandra-connector


【解决方案1】:

是的,您可以在 mapPartition 内完成所有工作。 datastax 的 API 允许您直接保存 Dstream。以下是 C* 的方法。

val partitionedDstream = kafkaStream.repartition(5) //change this value as per your data and spark cluster

//Now instead of iterating each RDD work on each partition.
val eventsStream: DStream[event] = partitionedDstream.mapPartitions(x => {
  val lst = scala.collection.mutable.ListBuffer[event]()
  while (x.hasNext) {
    val a = x.next()
    implicit val formats = DefaultFormats
    val jValue = parse(a._2)
    val fleetrecord = jValue.extract[historyevent]
    val hash = fleetrecord.imei + fleetrecord.date + fleetrecord.gpsdt
    val md5Hash = DigestUtils.md5Hex(hash).toUpperCase()
    val now = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(Calendar.getInstance().getTime())
    lst += event(fleetrecord.imei, fleetrecord.date, fleetrecord.gpsdt, now, md5Hash)
  }
  lst.toList.iterator
})

eventsStream.cache() //because you are using same Dstream for C* and Redis

//instead of collecting each RDD save whole Dstream at once
import com.datastax.spark.connector.streaming._
eventsStream.saveToCassandra("db", "table", SomeColumns("imei", "date", "gpsdt", "dt", "id"))

cassandra 还接受 timestamp 作为 Long 值,因此您还可以更改代码的某些部分,如下所示

val now = System.currentTimeMillis()

//also change your case class to take `Long` instead of `String`
case class event(imei: String, date: String, gpsdt: String, dt: Long, id: String)

同样,您也可以更改为 Redis

【讨论】:

  • 未找到:输入 DStream
  • 使用import org.apache.spark.streaming.dstream.DStream我添加了类型只是为了参考你可以删除类型。
  • 好的,我做到了,val partitionedDstream = kafkaStream.repartition(5),这个语句意味着什么?
  • 它将为您的 Dstream 创建 5 个分区。阅读更多关于分区的信息jaceklaskowski.gitbooks.io/mastering-apache-spark/content/…
  • 其实我现在已经开始了我的工作,我可以看到它在分区记录上花费了很多时间。
猜你喜欢
  • 1970-01-01
  • 2020-09-28
  • 2021-06-17
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-08-10
  • 1970-01-01
  • 2022-12-21
相关资源
最近更新 更多