【发布时间】:2018-05-23 11:35:01
【问题描述】:
我们使用 Kafka 0.10 和 Spark 2.1,我发现我们的生产者发布消息总是很慢。在给 Spark 执行器提供 8 个内核后,我只能达到 1k/s 左右,而其他帖子说他们很容易达到数百万/秒。 我试图调整 linger.ms 和 batch.size 来找出答案。但是我发现 linger.ms=0 对我来说似乎是最佳选择,而 batch.size 并没有太大影响。我每次迭代发送 160k 个事件。看起来我必须启用 Kafka Producer Metrics 才能知道到底发生了什么。但是貌似在 Spark Executor 中启用它并不是很容易。
谁能给我分享一些灯?
我的代码是这样的:
private def publishMessagesAttempt(producer: KafkaProducer[String, String], topic: String, messages: Iterable[(String, String)], producerMaxDelay: Long,
individualMessageMaxDelay: Long, logger: (String, Boolean) => Unit = KafkaClusterUtils.DEFAULT_LOGGER): Iterable[(String, String)] = {
val futureMessages = messages.map(message => (message, producer.send(new ProducerRecord[String, String](topic, message._1, message._2))))
val messageSentTime = System.currentTimeMillis
val awaitedResults = futureMessages.map { case (message, future) =>
val waitFor = Math.max(producerMaxDelay - (System.currentTimeMillis - messageSentTime), individualMessageMaxDelay)
val failed = Try(future.get(waitFor, TimeUnit.MILLISECONDS)) match {
case Success(_) => false
case Failure(f) =>
logger(s"Error happened when publish to Kafka: ${f.getStackTraceString}", true)
true
}
(message, failed)
}
awaitedResults.filter(_._2).map(_._1)
}
【问题讨论】: