【发布时间】:2020-08-09 23:35:38
【问题描述】:
我正在编写一个 spark 流作业,它从 Kafka 读取数据,对记录进行一些更改并将结果发送到另一个 Kafka 集群。
这项工作的性能似乎很慢,处理速度约为每秒 70,000 条记录。抽样显示,30% 的时间花在读取数据和处理数据上,其余 70% 的时间花在向 Kafka 发送数据上。
我尝试调整 Kafka 配置、添加内存、更改批处理间隔,但唯一有效的更改是添加更多内核。
Spark 作业详情:
max.cores 30
driver memory 6G
executor memory 16G
batch.interval 3 minutes
ingres rate 180,000 messages per second
生产者属性(我尝试了不同的变体)
def buildProducerKafkaProperties: Properties = {
val producerConfig = new Properties
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, destKafkaBrokers)
producerConfig.put(ProducerConfig.ACKS_CONFIG, "all")
producerConfig.put(ProducerConfig.BATCH_SIZE_CONFIG, "200000")
producerConfig.put(ProducerConfig.LINGER_MS_CONFIG, "2000")
producerConfig.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip")
producerConfig.put(ProducerConfig.RETRIES_CONFIG, "0")
producerConfig.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "13421728")
producerConfig.put(ProducerConfig.SEND_BUFFER_CONFIG, "13421728")
producerConfig
}
发送代码
stream
.foreachRDD(rdd => {
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
.map(consumerRecord => doSomething(consumerRecord))
.foreachPartition(partitionIter => {
val producer = kafkaSinkBroadcast.value
partitionIter.foreach(row => {
producer.send(kafkaTopic, row)
producedRecordsAcc.add(1)
})
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
})
版本
Spark Standalone cluster 2.3.1
Destination Kafka cluster 1.1.1
Kafka topic has 120 partitions
谁能建议如何提高发送吞吐量?
2019 年 7 月更新
大小:每秒 150k 条消息,每条消息大约有 100 列。
主要设置:
spark.cores.max = 30 # the cores balanced between all the workers.
spark.streaming.backpressure.enabled = true
ob.ingest.batch.duration= 3 minutes
我尝试使用 rdd.repartition(30),但它使执行速度变慢了 ~10%
谢谢
【问题讨论】:
标签: scala apache-kafka spark-streaming