【问题标题】:Spark Streaming to Kafka slow performanceSpark Streaming 到 Kafka 性能缓慢
【发布时间】:2020-08-09 23:35:38
【问题描述】:

我正在编写一个 spark 流作业,它从 Kafka 读取数据,对记录进行一些更改并将结果发送到另一个 Kafka 集群。

这项工作的性能似乎很慢,处理速度约为每秒 70,000 条记录。抽样显示,30% 的时间花在读取数据和处理数据上,其余 70% 的时间花在向 Kafka 发送数据上。

我尝试调整 Kafka 配置、添加内存、更改批处理间隔,但唯一有效的更改是添加更多内核。

分析器:

Spark 作业详情:

max.cores 30
driver memory 6G
executor memory 16G
batch.interval 3 minutes
ingres rate 180,000 messages per second

生产者属性(我尝试了不同的变体)

def buildProducerKafkaProperties: Properties = {
  val producerConfig = new Properties
  producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, destKafkaBrokers)
  producerConfig.put(ProducerConfig.ACKS_CONFIG, "all")
  producerConfig.put(ProducerConfig.BATCH_SIZE_CONFIG, "200000")
  producerConfig.put(ProducerConfig.LINGER_MS_CONFIG, "2000")
  producerConfig.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip")
  producerConfig.put(ProducerConfig.RETRIES_CONFIG, "0")
  producerConfig.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "13421728")
  producerConfig.put(ProducerConfig.SEND_BUFFER_CONFIG, "13421728")
  producerConfig
}

发送代码

 stream
    .foreachRDD(rdd => {    
        val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

        rdd
          .map(consumerRecord => doSomething(consumerRecord))
          .foreachPartition(partitionIter => {
            val producer = kafkaSinkBroadcast.value    
            partitionIter.foreach(row => {
              producer.send(kafkaTopic, row)
              producedRecordsAcc.add(1)
            })

        stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
      })

版本

Spark Standalone cluster 2.3.1
Destination Kafka cluster 1.1.1
Kafka topic has 120 partitions 

谁能建议如何提高发送吞吐量?


2019 年 7 月更新

大小:每秒 150k 条消息,每条消息大约有 100 列。

主要设置

spark.cores.max = 30 # the cores balanced between all the workers.
spark.streaming.backpressure.enabled = true
ob.ingest.batch.duration= 3 minutes 
 

我尝试使用 rdd.repartition(30),但它使执行速度变慢了 ~10%

谢谢

【问题讨论】:

    标签: scala apache-kafka spark-streaming


    【解决方案1】:

    尝试使用如下重新分区 -

    val numPartitons = (执行器数量 * 执行器核心数)

    stream
        .repartition(numPartitons)
        .foreachRDD(rdd => {    
            val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    
            rdd
              .map(consumerRecord => doSomething(consumerRecord))
              .foreachPartition(partitionIter => {
                val producer = kafkaSinkBroadcast.value    
                partitionIter.foreach(row => {
                  producer.send(kafkaTopic, row)
                  producedRecordsAcc.add(1)
                })
    
            stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
          })
    

    这将为您提供最佳性能。

    希望这会有所帮助。

    【讨论】:

    • 我试过了,性能下降了 10%,dag 显示了一个额外的步骤,我必须在 RDD 上进行重新分区,因为如果你在流上进行它会改变RDD 的类,你不能提交偏移量
    • 您能否分享您在每批中尝试处理的数据大小。还有您为执行器数量和执行器核心数设置的值。据我了解,一种固定配置不可能适合所有场景,根据数据大小和计算,需要更改执行器数量和执行器核心数量。
    猜你喜欢
    • 2018-10-05
    • 2020-07-25
    • 2018-08-23
    • 1970-01-01
    • 2018-01-13
    • 2019-08-08
    • 2021-06-17
    • 2016-03-12
    • 2018-05-17
    相关资源
    最近更新 更多