【问题标题】:How to enable Kafka Producer Metrics in Spark?如何在 Spark 中启用 Kafka Producer Metrics?
【发布时间】:2018-05-23 11:35:01
【问题描述】:

我们使用 Kafka 0.10 和 Spark 2.1,我发现我们的生产者发布消息总是很慢。在给 Spark 执行器提供 8 个内核后,我只能达到 1k/s 左右,而其他帖子说他们很容易达到数百万/秒。 我试图调整 linger.ms 和 batch.size 来找出答案。但是我发现 linger.ms=0 对我来说似乎是最佳选择,而 batch.size 并没有太大影响。我每次迭代发送 160k 个事件。看起来我必须启用 Kafka Producer Metrics 才能知道到底发生了什么。但是貌似在 Spark Executor 中启用它并不是很容易。

谁能给我分享一些灯?

我的代码是这样的:

private def publishMessagesAttempt(producer: KafkaProducer[String, String], topic: String, messages: Iterable[(String, String)], producerMaxDelay: Long,
                                 individualMessageMaxDelay: Long, logger: (String, Boolean) => Unit = KafkaClusterUtils.DEFAULT_LOGGER): Iterable[(String, String)] = {
val futureMessages = messages.map(message => (message, producer.send(new ProducerRecord[String, String](topic, message._1, message._2))))
val messageSentTime = System.currentTimeMillis
val awaitedResults = futureMessages.map { case (message, future) =>
  val waitFor = Math.max(producerMaxDelay - (System.currentTimeMillis - messageSentTime), individualMessageMaxDelay)
  val failed = Try(future.get(waitFor, TimeUnit.MILLISECONDS)) match {
    case Success(_) => false
    case Failure(f) =>
      logger(s"Error happened when publish to Kafka: ${f.getStackTraceString}", true)
      true
  }
  (message, failed)
}
awaitedResults.filter(_._2).map(_._1)
}

【问题讨论】:

    标签: apache-spark apache-kafka


    【解决方案1】:

    我终于找到了答案。 1、KafkaProducer有一个metrics()函数,可以获取Producer的metrics。只需简单地打印就足够了。

    这样的一些代码应该可以工作:

    public class MetricsProducerReporter implements Runnable {
    private final Producer<String, StockPrice> producer;
    private final Logger logger =
            LoggerFactory.getLogger(MetricsProducerReporter.class);
    
    //Used to Filter just the metrics we want
    private final Set<String> metricsNameFilter = Sets.set(
            "record-queue-time-avg", "record-send-rate", "records-per-request-avg",
            "request-size-max", "network-io-rate", "record-queue-time-avg",
            "incoming-byte-rate", "batch-size-avg", "response-rate", "requests-in-flight"
    );
    
    public MetricsProducerReporter(
            final Producer<String, StockPrice> producer) {
        this.producer = producer;
    }
    
    @Override
    public void run() {
        while (true) {
            final Map<MetricName, ? extends Metric> metrics
                    = producer.metrics();
    
            displayMetrics(metrics);
            try {
                Thread.sleep(3_000);
            } catch (InterruptedException e) {
                logger.warn("metrics interrupted");
                Thread.interrupted();
                break;
            }
        }
    }
    
    1. 我的代码很慢是因为 scala 映射默认没有启用并行。我将不得不使用 messages.par.map() 来实现并行性。

    【讨论】:

      最近更新 更多