【问题标题】:Spark Streaming - Dataframe Collect Performance IssueSpark Streaming - 数据帧收集性能问题
【发布时间】:2022-09-22 20:51:28
【问题描述】:

我正在尝试改进火花流应用程序以获得更好的性能。在每个流循环中,我为每个从主题消费的记录生成一个新的数据帧,我需要从这个数据帧中收集值列表,以用于分析模型阶段。

这是我的申请步骤:

1- Read from kafka
For Loop
    2- Generate a new dataframe by joining static dataframe with new topic dataframe (Columns : key,value)
    3- Collect value list from dataframe. (CollectDF function)
    4- Calling pmml model
    ...
    2- Generate a new dataframe by joining static dataframe with new topic dataframe (Columns : key,value)
    3- Collect value list from dataframe. (CollectDF function)
    4- Calling pmml model
    ...
    
    If there are 10 record in topic, this cycle is runing 10 times. At first, CollectDF process takes 1-2 seconds but after a few cycle in the loop, this process takes 8-10 seconds.
    Actually i dont understand how this is possible. How can i keep the process time stable ?
     

      kafkaStream.foreachRDD(rdd => {
        stream_df.collect().foreach { row =>
        ...
        val model_feature_list = CollectDF(df_model)
        val predictions = model.predict(model_feature_list)
        }
    }
    
      def CollectDF(df_modelparam : DataFrame): Array[Int] ={
        val x : Map[String, Int] = df_modelparam.collect.map( r => {
          val key = r(0).toString
          val value = r(1).toString.toInt
          (key -> value)
        }
        ).toMap.toSortedMap
        var x_arr = x.values.toArray
        x_arr
      }   

提前致谢

    标签: performance apache-spark rdd collect


    【解决方案1】:

    我可以知道收集数据给司机的原因吗?

    理想情况下,您应该尽量避免在 spark 流用例中使用 collect() 函数,因为它的操作成本很高,并且可能会减慢速度。

    也许您可以在流式 Dataframe 本身上尝试类似下面的操作,而不是将数据收集到驱动程序。

    streamingDF.mapPartitions(rowIterator=>{
    rowIterator.foreach(row =>{
              val key = row(0).toString
              val value = row(1).toString.toInt
              (key -> value)
              // analytical use case on the above key, value being created
       }
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2018-08-13
      • 1970-01-01
      • 2015-08-07
      • 2021-03-07
      • 1970-01-01
      • 1970-01-01
      • 2017-07-02
      • 1970-01-01
      相关资源
      最近更新 更多